Zhilong Hong created FLINK-22077:
------------------------------------
Summary: Wrong way to calculate cross-region
ConsumedPartitionGroups in PipelinedRegionSchedulingStrategy
Key: FLINK-22077
URL: https://issues.apache.org/jira/browse/FLINK-22077
Project: Flink
Issue Type: Bug
Components: Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Zhilong Hong
Fix For: 1.13.0
h3. Introduction
We implement a wrong way to calculate cross-region ConsumedPartitionGroups in
{{PipelinedRegionSchedulingStrategy}}, it slows down the procedure of
{{onExecutionStateChange}}, make the complexity from O(N) to O(N^2). Also the
semantic of cross-region is totally wrong.
h3. Details
In {{PipelinedRegionSchedulingStrategy#startScheduling}}, as expected, we need
to schedule all region with no external blocking edges, i.e., source regions.
To decrease the complexity, we choose to schedule all the regions that has no
external BLOCKING ConsumedPartitionGroups.
However, for the case illustrated in FLINK-22017, the region 2 has a
ConsumedPartitionGroup, which has both internal and external blocking
IntermediateResultPartitions. If we choose one to represent the entire
ConsumedPartitionGroup, it may choose the internal one, and make the entire
group internal. Region 2 will be scheduled.
As Region 1 is not finished, Region 2 cannot transition to running. A deadlock
may happen if resource is not enough for both two regions.
To make it right, we introduced cross-region ConsumedPartitionGroups in
FLINK-21330. The regions which has ConsumedPartitionGroups with both internal
and external blocking IntermediateResultPartitions will be recorded. When we
call {{startScheduling}}, these ConsumedPartitionGroups will be treated as
external, and region 2 will not be scheduled.
But we have to admit that the implementation of cross-region is wrong. The
ConsumedPartitionGroups that has multiple producer regions will be treated as
cross-region groups. It is not the same logic as we mentioned above. The
semantic is totally wrong. Also all the ALL-TO-ALL BLOCKING
ConsumedPartitionGroups will be treated as cross-region groups, since their
producers are in different regions. (Each producer has its own region.) This
slows down the complexity from O(N) to O(N^2) for ALL-TO-ALL BLOCKING edges.
h3. Solution
To correctly calculate the cross-region ConsumedPartitionGroups, we can just
calculate the producer regions for all ConsumedPartitionGroups, and then
iterate all the regions and its ConsumedPartitionGroups. If the
ConsumedPartitionGroup has two or more producer regions, and the regions
contains current region, it is a cross-region ConsumedPartitionGroup. This
meets the correct semantics, and make sure ALL-TO-ALL BLOCKING
ConsumedPartitionGroups will not be treated as cross-region one. This fix will
also decreases the complexity from O(N) to O(N^2). I prefer it's necessary to
add this bug-fix to release 1.13.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)