[
https://issues.apache.org/jira/browse/BEAM-9824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ismaël Mejía updated BEAM-9824:
-------------------------------
Status: Open (was: Triage Needed)
> Multiple reshuffles are ignored in some cases on Flink batch runner.
> --------------------------------------------------------------------
>
> Key: BEAM-9824
> URL: https://issues.apache.org/jira/browse/BEAM-9824
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Affects Versions: 2.19.0, 2.20.0
> Reporter: David Morávek
> Assignee: David Morávek
> Priority: Major
> Fix For: 2.21.0
>
> Time Spent: 10m
> Remaining Estimate: 0h
>
> Multiple reshuffles are ignored in some cases on Flink batch runner. This may
> lead to huge performace penalty in IO connectors (when reshuffling splits).
> In flink optimizer, when we `.rebalance()` dataset, is output channel is
> marked as `FORCED_REBALANCED`. When we chain this with another
> `.rebalance()`, the latter is ignored because it's source is already
> `FORCED_REBALANCED`, thus requested property is met. This is correct beaviour
> because rebalance is idempotent.
> When we include `flatMap` in between rebalances ->
> `.rebalance().flatMap(...).rebalance()`, we need to reshuffle again, because
> dataset distribution may have changed (eg. you can possibli emit unbouded
> stream from a single element). Unfortunatelly `flatMap` output is still
> incorrectly marked as `FORCED_REBALANCED` and the second reshuffle gets
> ignored.
> This especially affects IO connectors -> `FileIO.match()` returns reshuffled
> list of matched files -> we split each file into ranges -> **reshuffle** ->
> read. Ignoring the second reshuffle leads to huge perf. degradation (5m -> 2h
> in one of our production pipelines)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)