[
https://issues.apache.org/jira/browse/BEAM-5330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Beam JIRA Bot reassigned BEAM-5330:
-----------------------------------
Assignee: (was: David Morávek)
> Support zero-shuffle grouping operations
> ----------------------------------------
>
> Key: BEAM-5330
> URL: https://issues.apache.org/jira/browse/BEAM-5330
> Project: Beam
> Issue Type: Improvement
> Components: dsl-euphoria
> Reporter: Jan Lukavský
> Priority: P2
> Labels: performance, stale-assigned
>
> On some occasions input dataset might be already correctly shuffled (i.e. as
> a result of previous operation(s)), which means that subsequent grouping
> operation could leverage this and remove the unneeded shuffle. Example
> (pseudocode):
> {code:java}
> Dataset<Integer> input = ...
> Dataset<Pair<Integer, Long>> counts1 = CountByKey.of(input)
> .keyBy(e -> e)
> .windowBy( /* some small window */ )
> .output();
> Dataset<Pair<Integer, Long>> counts2 = SumByKey.of(counts1)
> .keyBy(Pair::getFirst)
> .windowBy( /* larger window */ )
> .output();
> {code}
> Now, the second {{ReduceByKey}} already might have correct shuffle (depends
> on runner), but isn't able to leverage this, because it isn't aware that the
> key grouping key has not changed from the previous operation.
> Proposed change:
> {code:java}
> Dataset<Integer> input = ...
> Dataset<Pair<Integer, Long>> counts1 = CountByKey.of(input)
> .keyBy(e -> e)
> .windowBy( /* some small window */ )
> .output();
> Dataset<Pair<Integer, Long>> counts2 = SumByKey.of(counts1)
> .keyByLocally(Pair::getFirst)
> .windowBy( /* larger window */ )
> .output();
> {code}
> Introduce {{keyByLocally}} to keyed operations, which will tell the runner
> that the grouping is preserved from one keyed operator to the other.
> This will probably require some support on Beam SDK side, because this
> information has to be passed to the runner (so that i.e. FlinkRunner can make
> use of something like {{DataStreamUtils#reinterpretAsKeyedStream}}.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)