Repository: beam Updated Branches: refs/heads/DSL_SQL bf39926b4 -> 8e9b930bc
[BEAM-2477] BeamAggregationRel should use Combine.perKey instead of GroupByKey Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/890dd03d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/890dd03d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/890dd03d Branch: refs/heads/DSL_SQL Commit: 890dd03d6d3643a6ac39d041038cc110d6317ef9 Parents: bf39926 Author: JingsongLi <[email protected]> Authored: Tue Jun 20 09:20:19 2017 +0800 Committer: Tyler Akidau <[email protected]> Committed: Thu Jun 22 13:27:53 2017 -0700 ---------------------------------------------------------------------- .../beam/dsls/sql/rel/BeamAggregationRel.java | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/890dd03d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java index 7a1d003a..701f620 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java @@ -25,10 +25,8 @@ import org.apache.beam.dsls.sql.schema.BeamSqlRow; import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder; import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms; import org.apache.beam.dsls.sql.utils.CalciteUtils; -import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.transforms.WithTimestamps; @@ -93,25 +91,21 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode { .accumulatingFiredPanes()); BeamSqlRowCoder keyCoder = new BeamSqlRowCoder(exKeyFieldsSchema(input.getRowType())); - PCollection<KV<BeamSqlRow, BeamSqlRow>> exGroupByStream = windowStream.apply( - stageName + "_exGroupBy", + PCollection<KV<BeamSqlRow, BeamSqlRow>> exCombineByStream = windowStream.apply( + stageName + "_exCombineBy", WithKeys .of(new BeamAggregationTransforms.AggregationGroupByKeyFn( windowFieldIdx, groupSet))) .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, upstream.getCoder())); - PCollection<KV<BeamSqlRow, Iterable<BeamSqlRow>>> groupedStream = exGroupByStream - .apply(stageName + "_groupBy", GroupByKey.<BeamSqlRow, BeamSqlRow>create()) - .setCoder(KvCoder.<BeamSqlRow, Iterable<BeamSqlRow>>of(keyCoder, - IterableCoder.<BeamSqlRow>of(upstream.getCoder()))); - BeamSqlRowCoder aggCoder = new BeamSqlRowCoder(exAggFieldsSchema()); - PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = groupedStream.apply( - stageName + "_aggregation", - Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>groupedValues( + + PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = exCombineByStream.apply( + stageName + "_combineBy", + Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>perKey( new BeamAggregationTransforms.AggregationCombineFn(getAggCallList(), CalciteUtils.toBeamRecordType(input.getRowType())))) - .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, aggCoder)); + .setCoder(KvCoder.of(keyCoder, aggCoder)); PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + "_mergeRecord", ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(
