[
https://issues.apache.org/jira/browse/BEAM-9824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094196#comment-17094196
]
David Morávek commented on BEAM-9824:
-------------------------------------
Relevant flink ML discussion:
https://lists.apache.org/thread.html/rc811150f8eb8635334b990a9e79d9abcbe301a8f88066597d59b34a8%40%3Cdev.flink.apache.org%3E
> Multiple reshuffles are ignored in some cases on Flink batch runner.
> --------------------------------------------------------------------
>
> Key: BEAM-9824
> URL: https://issues.apache.org/jira/browse/BEAM-9824
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Affects Versions: 2.19.0, 2.20.0
> Reporter: David Morávek
> Assignee: David Morávek
> Priority: Major
> Fix For: 2.22.0
>
> Time Spent: 50m
> Remaining Estimate: 0h
>
> Multiple reshuffles are ignored in some cases on Flink batch runner. This may
> lead to huge performace penalty in IO connectors (when reshuffling splits).
> In flink optimizer, when we `.rebalance()` dataset, is output channel is
> marked as `FORCED_REBALANCED`. When we chain this with another
> `.rebalance()`, the latter is ignored because it's source is already
> `FORCED_REBALANCED`, thus requested property is met. This is correct beaviour
> because rebalance is idempotent.
> When we include `flatMap` in between rebalances ->
> `.rebalance().flatMap(...).rebalance()`, we need to reshuffle again, because
> dataset distribution may have changed (eg. you can possibli emit unbouded
> stream from a single element). Unfortunatelly `flatMap` output is still
> incorrectly marked as `FORCED_REBALANCED` and the second reshuffle gets
> ignored.
> This especially affects IO connectors -> `FileIO.match()` returns reshuffled
> list of matched files -> we split each file into ranges -> **reshuffle** ->
> read. Ignoring the second reshuffle leads to huge perf. degradation (5m -> 2h
> in one of our production pipelines)
--
This message was sent by Atlassian Jira
(v8.3.4#803005)