[
https://issues.apache.org/jira/browse/FLINK-33123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zhanghao Chen updated FLINK-33123:
----------------------------------
Description:
*Background*
https://issues.apache.org/jira/browse/FLINK-30213 reported that the edge is
wrong when the parallelism is changed for a vertex with a FORWARD edge, which
is used by both the autoscaler and adaptive scheduler where one can change the
vertex parallelism dynamically. Fix is applied to dynamically replace
partitioner from FORWARD to REBLANCE on task deployment in {{{}StreamTask{}}}:
!image-2023-09-20-15-09-22-733.png|width=560,height=221!
*Problem*
Unfortunately, the fix is still buggy in two aspects:
# The connections between upstream and downstream tasks are determined by the
distribution type of the partitioner when generating execution graph on the JM
side. When the edge is FORWARD, the distribution type is POINTWISE, and Flink
will try to evenly distribute subpartitions to all downstream tasks. If one
want to change it to REBALANCE, the distribution type has to be changed to
ALL_TO_ALL to make all-to-all connections between upstream and downstream
tasks. However, the fix did not change the distribution type which makes the
network connections be set up in a wrong way.
# The FOWARD partitioner will be replaced if
environment.getWriter(outputIndex).getNumberOfSubpartitions() equals to the
task parallelism. However, the number of subpartitions here equals to the
number of downstream tasks of this particular task, which is also determined by
the distribution type of the partitioner when generating execution graph on the
JM side. When ceil(downstream task parallelism / upstream task parallelism) =
upstream task parallelism, we will have the number of subpartitions = task
parallelism. For example, for a topology A (parallelism 3) -> B (parallelism
8), we will have 2 A tasks having 3 subpartitions, 1 A task having 2
subpartition, and hence 2 tasks will have its number of subpartitions equals to
the task parallelism 3 and skip partitioner replacement. In fact, for a normal
job with a FORWARD edge without any autoscaling action, you will find that the
partitioner is changed to REBALANCE internally as the number of subpartitions
always equals to 1 in this case.
was:
*Background*
https://issues.apache.org/jira/browse/FLINK-30213 reported that the edge is
wrong when the parallelism is changed for a vertex with a FORWARD edge, which
is used by both the autoscaler and adaptive scheduler where one can change the
vertex parallelism dynamically. Fix is applied to dynamically replace
partitioner from FORWARD to REBLANCE on task deployment in {{{}StreamTask{}}}:
{{private static void
replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(}}
{{ Environment environment, NonChainedOutput streamOutput) {}}
{{ Environment environment, NonChainedOutput streamOutput, int
outputIndex) {}}
{{ if (streamOutput.getPartitioner() instanceof ForwardPartitioner}}
{{ && streamOutput.getConsumerParallelism()}}
{{ &&
environment.getWriter(outputIndex).getNumberOfSubpartitions()}}
{{ !=
environment.getTaskInfo().getNumberOfParallelSubtasks()) {}}
{{ LOG.debug(}}
{{ "Replacing forward partitioner with rebalance for {}",}}
{{ environment.getTaskInfo().getTaskNameWithSubtasks());}}
{{ streamOutput.setPartitioner(new RebalancePartitioner<>());}}
{{ }}}
{{ }}}
*Problem*
Unfortunately, the fix is still buggy in two aspects:
# The connections between upstream and downstream tasks are determined by the
distribution type of the partitioner when generating execution graph on the JM
side. When the edge is FORWARD, the distribution type is POINTWISE, and Flink
will try to evenly distribute subpartitions to all downstream tasks. If one
want to change it to REBALANCE, the distribution type has to be changed to
ALL_TO_ALL to make all-to-all connections between upstream and downstream
tasks. However, the fix did not change the distribution type which makes the
network connections be set up in a wrong way.
# The FOWARD partitioner will be replaced if
environment.getWriter(outputIndex).getNumberOfSubpartitions() equals to the
task parallelism. However, the number of subpartitions here equals to the
number of downstream tasks of this particular task, which is also determined by
the distribution type of the partitioner when generating execution graph on the
JM side. When ceil(downstream task parallelism / upstream task parallelism) =
upstream task parallelism, we will have the number of subpartitions = task
parallelism. In fact, for a normal job with a FORWARD edge without any
autoscaling action, you will find that the partitioner is changed to REBALANCE
internally as the number of subpartitions always equals to 1 in this case.
> Wrong dynamic replacement of partitioner from FORWARD to REBLANCE for
> autoscaler and adaptive scheduler and
> -------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-33123
> URL: https://issues.apache.org/jira/browse/FLINK-33123
> Project: Flink
> Issue Type: Bug
> Components: Autoscaler, Runtime / Coordination
> Affects Versions: 1.17.0, 1.18.0
> Reporter: Zhanghao Chen
> Priority: Critical
> Attachments: image-2023-09-20-15-09-22-733.png
>
>
> *Background*
> https://issues.apache.org/jira/browse/FLINK-30213 reported that the edge is
> wrong when the parallelism is changed for a vertex with a FORWARD edge, which
> is used by both the autoscaler and adaptive scheduler where one can change
> the vertex parallelism dynamically. Fix is applied to dynamically replace
> partitioner from FORWARD to REBLANCE on task deployment in
> {{{}StreamTask{}}}:
>
> !image-2023-09-20-15-09-22-733.png|width=560,height=221!
> *Problem*
> Unfortunately, the fix is still buggy in two aspects:
> # The connections between upstream and downstream tasks are determined by
> the distribution type of the partitioner when generating execution graph on
> the JM side. When the edge is FORWARD, the distribution type is POINTWISE,
> and Flink will try to evenly distribute subpartitions to all downstream
> tasks. If one want to change it to REBALANCE, the distribution type has to be
> changed to ALL_TO_ALL to make all-to-all connections between upstream and
> downstream tasks. However, the fix did not change the distribution type which
> makes the network connections be set up in a wrong way.
> # The FOWARD partitioner will be replaced if
> environment.getWriter(outputIndex).getNumberOfSubpartitions() equals to the
> task parallelism. However, the number of subpartitions here equals to the
> number of downstream tasks of this particular task, which is also determined
> by the distribution type of the partitioner when generating execution graph
> on the JM side. When ceil(downstream task parallelism / upstream task
> parallelism) = upstream task parallelism, we will have the number of
> subpartitions = task parallelism. For example, for a topology A (parallelism
> 3) -> B (parallelism 8), we will have 2 A tasks having 3 subpartitions, 1 A
> task having 2 subpartition, and hence 2 tasks will have its number of
> subpartitions equals to the task parallelism 3 and skip partitioner
> replacement. In fact, for a normal job with a FORWARD edge without any
> autoscaling action, you will find that the partitioner is changed to
> REBALANCE internally as the number of subpartitions always equals to 1 in
> this case.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)