[ 
https://issues.apache.org/jira/browse/FLINK-33123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhanghao Chen updated FLINK-33123:
----------------------------------
    Attachment: image-2023-09-20-15-09-22-733.png

> Wrong dynamic replacement of partitioner from FORWARD to REBLANCE for 
> autoscaler and adaptive scheduler  and 
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33123
>                 URL: https://issues.apache.org/jira/browse/FLINK-33123
>             Project: Flink
>          Issue Type: Bug
>          Components: Autoscaler, Runtime / Coordination
>    Affects Versions: 1.17.0, 1.18.0
>            Reporter: Zhanghao Chen
>            Priority: Critical
>         Attachments: image-2023-09-20-15-09-22-733.png
>
>
> *Background*
> https://issues.apache.org/jira/browse/FLINK-30213 reported that the edge is 
> wrong when the parallelism is changed for a vertex with a FORWARD edge, which 
> is used by both the autoscaler and adaptive scheduler where one can change 
> the vertex parallelism dynamically. Fix is applied to dynamically replace 
> partitioner from FORWARD to REBLANCE on task deployment in 
> {{{}StreamTask{}}}: 
>     {{private static void 
> replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(}}
> {{            Environment environment, NonChainedOutput streamOutput) {}}
> {{            Environment environment, NonChainedOutput streamOutput, int 
> outputIndex) {}}
> {{        if (streamOutput.getPartitioner() instanceof ForwardPartitioner}}
> {{                && streamOutput.getConsumerParallelism()}}
> {{                && 
> environment.getWriter(outputIndex).getNumberOfSubpartitions()}}
> {{                        != 
> environment.getTaskInfo().getNumberOfParallelSubtasks()) {}}
> {{            LOG.debug(}}
> {{                    "Replacing forward partitioner with rebalance for {}",}}
> {{                    environment.getTaskInfo().getTaskNameWithSubtasks());}}
> {{            streamOutput.setPartitioner(new RebalancePartitioner<>());}}
> {{        }}}
> {{    }}}
> *Problem*
> Unfortunately, the fix is still buggy in two aspects:
>  # The connections between upstream and downstream tasks are determined by 
> the distribution type of the partitioner when generating execution graph on 
> the JM side. When the edge is FORWARD, the distribution type is POINTWISE, 
> and Flink will try to evenly distribute subpartitions to all downstream 
> tasks. If one want to change it to REBALANCE, the distribution type has to be 
> changed to ALL_TO_ALL to make all-to-all connections between upstream and 
> downstream tasks. However, the fix did not change the distribution type which 
> makes the network connections be set up in a wrong way.
>  # The FOWARD partitioner will be replaced if 
> environment.getWriter(outputIndex).getNumberOfSubpartitions() equals to the 
> task parallelism. However, the number of subpartitions here equals to the 
> number of downstream tasks of this particular task, which is also determined 
> by the distribution type of the partitioner when generating execution graph 
> on the JM side.  When ceil(downstream task parallelism / upstream task 
> parallelism) = upstream task parallelism, we will have the number of 
> subpartitions = task parallelism. In fact, for a normal job with a FORWARD 
> edge without any autoscaling action, you will find that the partitioner is 
> changed to REBALANCE internally as the number of subpartitions always equals 
> to 1 in this case.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to