Repository: beam
Updated Branches:
  refs/heads/DSL_SQL bf39926b4 -> 8e9b930bc


[BEAM-2477] BeamAggregationRel should use Combine.perKey instead of GroupByKey


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/890dd03d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/890dd03d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/890dd03d

Branch: refs/heads/DSL_SQL
Commit: 890dd03d6d3643a6ac39d041038cc110d6317ef9
Parents: bf39926
Author: JingsongLi <[email protected]>
Authored: Tue Jun 20 09:20:19 2017 +0800
Committer: Tyler Akidau <[email protected]>
Committed: Thu Jun 22 13:27:53 2017 -0700

----------------------------------------------------------------------
 .../beam/dsls/sql/rel/BeamAggregationRel.java   | 20 +++++++-------------
 1 file changed, 7 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/890dd03d/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
----------------------------------------------------------------------
diff --git 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
index 7a1d003a..701f620 100644
--- 
a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
+++ 
b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java
@@ -25,10 +25,8 @@ import org.apache.beam.dsls.sql.schema.BeamSqlRow;
 import org.apache.beam.dsls.sql.schema.BeamSqlRowCoder;
 import org.apache.beam.dsls.sql.transform.BeamAggregationTransforms;
 import org.apache.beam.dsls.sql.utils.CalciteUtils;
-import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.WithTimestamps;
@@ -93,25 +91,21 @@ public class BeamAggregationRel extends Aggregate 
implements BeamRelNode {
         .accumulatingFiredPanes());
 
     BeamSqlRowCoder keyCoder = new 
BeamSqlRowCoder(exKeyFieldsSchema(input.getRowType()));
-    PCollection<KV<BeamSqlRow, BeamSqlRow>> exGroupByStream = 
windowStream.apply(
-        stageName + "_exGroupBy",
+    PCollection<KV<BeamSqlRow, BeamSqlRow>> exCombineByStream = 
windowStream.apply(
+        stageName + "_exCombineBy",
         WithKeys
             .of(new BeamAggregationTransforms.AggregationGroupByKeyFn(
                 windowFieldIdx, groupSet)))
         .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, 
upstream.getCoder()));
 
-    PCollection<KV<BeamSqlRow, Iterable<BeamSqlRow>>> groupedStream = 
exGroupByStream
-        .apply(stageName + "_groupBy", GroupByKey.<BeamSqlRow, 
BeamSqlRow>create())
-        .setCoder(KvCoder.<BeamSqlRow, Iterable<BeamSqlRow>>of(keyCoder,
-            IterableCoder.<BeamSqlRow>of(upstream.getCoder())));
-
     BeamSqlRowCoder aggCoder = new BeamSqlRowCoder(exAggFieldsSchema());
-    PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = 
groupedStream.apply(
-        stageName + "_aggregation",
-        Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>groupedValues(
+
+    PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = 
exCombineByStream.apply(
+        stageName + "_combineBy",
+        Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>perKey(
             new 
BeamAggregationTransforms.AggregationCombineFn(getAggCallList(),
                 CalciteUtils.toBeamRecordType(input.getRowType()))))
-        .setCoder(KvCoder.<BeamSqlRow, BeamSqlRow>of(keyCoder, aggCoder));
+        .setCoder(KvCoder.of(keyCoder, aggCoder));
 
     PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + 
"_mergeRecord",
         ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(

Reply via email to