[ 
https://issues.apache.org/jira/browse/BEAM-5330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Beam JIRA Bot reassigned BEAM-5330:
-----------------------------------

    Assignee:     (was: David Morávek)

> Support zero-shuffle grouping operations
> ----------------------------------------
>
>                 Key: BEAM-5330
>                 URL: https://issues.apache.org/jira/browse/BEAM-5330
>             Project: Beam
>          Issue Type: Improvement
>          Components: dsl-euphoria
>            Reporter: Jan Lukavský
>            Priority: P2
>              Labels: performance, stale-assigned
>
> On some occasions input dataset might be already correctly shuffled (i.e. as 
> a result of previous operation(s)), which means that subsequent grouping 
> operation could leverage this and remove the unneeded shuffle. Example 
> (pseudocode):
> {code:java}
>  Dataset<Integer> input = ...
>  Dataset<Pair<Integer, Long>> counts1 = CountByKey.of(input)
>    .keyBy(e -> e)
>    .windowBy( /* some small window */ )
>    .output();
>  Dataset<Pair<Integer, Long>> counts2 = SumByKey.of(counts1)
>    .keyBy(Pair::getFirst)
>    .windowBy( /* larger window */ )
>    .output();
> {code}
> Now, the second {{ReduceByKey}} already might have correct shuffle (depends 
> on runner), but isn't able to leverage this, because it isn't aware that the 
> key grouping key has not changed from the previous operation.
> Proposed change:
> {code:java}
>  Dataset<Integer> input = ...
>  Dataset<Pair<Integer, Long>> counts1 = CountByKey.of(input)
>    .keyBy(e -> e)
>    .windowBy( /* some small window */ )
>    .output();
>  Dataset<Pair<Integer, Long>> counts2 = SumByKey.of(counts1)
>    .keyByLocally(Pair::getFirst)
>    .windowBy( /* larger window */ )
>    .output();
> {code}
> Introduce {{keyByLocally}} to keyed operations, which will tell the runner 
> that the grouping is preserved from one keyed operator to the other.
> This will probably require some support on Beam SDK side, because this 
> information has to be passed to the runner (so that i.e. FlinkRunner can make 
> use of something like {{DataStreamUtils#reinterpretAsKeyedStream}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to