Repository: beam Updated Branches: refs/heads/DSL_SQL aa265e62a -> e38cf43df
support TUMBLE/HOP/SESSION _START function Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4391e1dd Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4391e1dd Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4391e1dd Branch: refs/heads/DSL_SQL Commit: 4391e1dd895c6fb0aa2d3600792415ad2d041c5b Parents: aa265e6 Author: mingmxu <[email protected]> Authored: Sun Jul 9 00:52:23 2017 -0700 Committer: Tyler Akidau <[email protected]> Committed: Wed Jul 12 09:35:50 2017 -0700 ---------------------------------------------------------------------- .../interpreter/operator/BeamSqlPrimitive.java | 4 +++ .../beam/dsls/sql/rel/BeamAggregationRel.java | 2 +- .../transform/BeamAggregationTransforms.java | 8 ++++- .../dsls/sql/BeamSqlDslAggregationTest.java | 35 +++++++++++++++----- .../transform/BeamAggregationTransformTest.java | 2 +- 5 files changed, 39 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/4391e1dd/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java index 92d1263..c5c80b9 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlPrimitive.java @@ -97,6 +97,10 @@ public class BeamSqlPrimitive<T> extends BeamSqlExpression { return (Date) getValue(); } + public BigDecimal getDecimal() { + return (BigDecimal) getValue(); + } + @Override public boolean accept() { if (value == null) { http://git-wip-us.apache.org/repos/asf/beam/blob/4391e1dd/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java index 9bb2902..5389ec7 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/rel/BeamAggregationRel.java @@ -110,7 +110,7 @@ public class BeamAggregationRel extends Aggregate implements BeamRelNode { PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply(stageName + "mergeRecord", ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord( - CalciteUtils.toBeamRecordType(getRowType()), getAggCallList()))); + CalciteUtils.toBeamRecordType(getRowType()), getAggCallList(), windowFieldIdx))); mergedStream.setCoder(new BeamSqlRowCoder(CalciteUtils.toBeamRecordType(getRowType()))); return mergedStream; http://git-wip-us.apache.org/repos/asf/beam/blob/4391e1dd/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java index 9c0b4a3..41e5837 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/transform/BeamAggregationTransforms.java @@ -59,13 +59,16 @@ public class BeamAggregationTransforms implements Serializable{ public static class MergeAggregationRecord extends DoFn<KV<BeamSqlRow, BeamSqlRow>, BeamSqlRow> { private BeamSqlRecordType outRecordType; private List<String> aggFieldNames; + private int windowFieldIdx; - public MergeAggregationRecord(BeamSqlRecordType outRecordType, List<AggregateCall> aggList) { + public MergeAggregationRecord(BeamSqlRecordType outRecordType, List<AggregateCall> aggList + , int windowFieldIdx) { this.outRecordType = outRecordType; this.aggFieldNames = new ArrayList<>(); for (AggregateCall ac : aggList) { aggFieldNames.add(ac.getName()); } + this.windowFieldIdx = windowFieldIdx; } @ProcessElement @@ -80,6 +83,9 @@ public class BeamAggregationTransforms implements Serializable{ for (int idx = 0; idx < aggFieldNames.size(); ++idx) { outRecord.addField(aggFieldNames.get(idx), kvRecord.getValue().getFieldValue(idx)); } + if (windowFieldIdx != -1) { + outRecord.addField(windowFieldIdx, outRecord.getWindowStart().toDate()); + } c.output(outRecord); } http://git-wip-us.apache.org/repos/asf/beam/blob/4391e1dd/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java index f92c803..ac0b1cb 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/BeamSqlDslAggregationTest.java @@ -159,24 +159,29 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { */ @Test public void testTumbleWindow() throws Exception { - String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A " + String sql = "SELECT f_int2, COUNT(*) AS `size`," + + " TUMBLE_START(f_timestamp, INTERVAL '1' HOUR) AS `window_start`" + + " FROM TABLE_A " + "GROUP BY f_int2, TUMBLE(f_timestamp, INTERVAL '1' HOUR)"; PCollection<BeamSqlRow> result = PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1) .apply("testTumbleWindow", BeamSql.query(sql)); - BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "size"), - Arrays.asList(Types.INTEGER, Types.BIGINT)); + BeamSqlRecordType resultType = BeamSqlRecordType.create( + Arrays.asList("f_int2", "size", "window_start"), + Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP)); BeamSqlRow record1 = new BeamSqlRow(resultType); record1.addField("f_int2", 0); record1.addField("size", 3L); + record1.addField("window_start", FORMAT.parse("2017-01-01 01:00:00")); record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:00:00").getTime())); record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime())); BeamSqlRow record2 = new BeamSqlRow(resultType); record2.addField("f_int2", 0); record2.addField("size", 1L); + record2.addField("window_start", FORMAT.parse("2017-01-01 02:00:00")); record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime())); record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 03:00:00").getTime())); @@ -190,35 +195,42 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { */ @Test public void testHopWindow() throws Exception { - String sql = "SELECT f_int2, COUNT(*) AS `size` FROM PCOLLECTION " + String sql = "SELECT f_int2, COUNT(*) AS `size`," + + " HOP_START(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE) AS `window_start`" + + " FROM PCOLLECTION " + "GROUP BY f_int2, HOP(f_timestamp, INTERVAL '1' HOUR, INTERVAL '30' MINUTE)"; PCollection<BeamSqlRow> result = inputA1.apply("testHopWindow", BeamSql.simpleQuery(sql)); - BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "size"), - Arrays.asList(Types.INTEGER, Types.BIGINT)); + BeamSqlRecordType resultType = BeamSqlRecordType.create( + Arrays.asList("f_int2", "size", "window_start"), + Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP)); BeamSqlRow record1 = new BeamSqlRow(resultType); record1.addField("f_int2", 0); record1.addField("size", 3L); + record1.addField("window_start", FORMAT.parse("2017-01-01 00:30:00")); record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 00:30:00").getTime())); record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 01:30:00").getTime())); BeamSqlRow record2 = new BeamSqlRow(resultType); record2.addField("f_int2", 0); record2.addField("size", 3L); + record2.addField("window_start", FORMAT.parse("2017-01-01 01:00:00")); record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:00:00").getTime())); record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime())); BeamSqlRow record3 = new BeamSqlRow(resultType); record3.addField("f_int2", 0); record3.addField("size", 1L); + record3.addField("window_start", FORMAT.parse("2017-01-01 01:30:00")); record3.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:30:00").getTime())); record3.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:30:00").getTime())); BeamSqlRow record4 = new BeamSqlRow(resultType); record4.addField("f_int2", 0); record4.addField("size", 1L); + record4.addField("window_start", FORMAT.parse("2017-01-01 02:00:00")); record4.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:00:00").getTime())); record4.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 03:00:00").getTime())); @@ -232,24 +244,29 @@ public class BeamSqlDslAggregationTest extends BeamSqlDslBase { */ @Test public void testSessionWindow() throws Exception { - String sql = "SELECT f_int2, COUNT(*) AS `size` FROM TABLE_A " + String sql = "SELECT f_int2, COUNT(*) AS `size`," + + " SESSION_START(f_timestamp, INTERVAL '5' MINUTE) AS `window_start`" + + " FROM TABLE_A " + "GROUP BY f_int2, SESSION(f_timestamp, INTERVAL '5' MINUTE)"; PCollection<BeamSqlRow> result = PCollectionTuple.of(new TupleTag<BeamSqlRow>("TABLE_A"), inputA1) .apply("testSessionWindow", BeamSql.query(sql)); - BeamSqlRecordType resultType = BeamSqlRecordType.create(Arrays.asList("f_int2", "size"), - Arrays.asList(Types.INTEGER, Types.BIGINT)); + BeamSqlRecordType resultType = BeamSqlRecordType.create( + Arrays.asList("f_int2", "size", "window_start"), + Arrays.asList(Types.INTEGER, Types.BIGINT, Types.TIMESTAMP)); BeamSqlRow record1 = new BeamSqlRow(resultType); record1.addField("f_int2", 0); record1.addField("size", 3L); + record1.addField("window_start", FORMAT.parse("2017-01-01 01:01:03")); record1.setWindowStart(new Instant(FORMAT.parse("2017-01-01 01:01:03").getTime())); record1.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 01:11:03").getTime())); BeamSqlRow record2 = new BeamSqlRow(resultType); record2.addField("f_int2", 0); record2.addField("size", 1L); + record2.addField("window_start", FORMAT.parse("2017-01-01 02:04:03")); record2.setWindowStart(new Instant(FORMAT.parse("2017-01-01 02:04:03").getTime())); record2.setWindowEnd(new Instant(FORMAT.parse("2017-01-01 02:09:03").getTime())); http://git-wip-us.apache.org/repos/asf/beam/blob/4391e1dd/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java index 2b01254..a0fed22 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/schema/transform/BeamAggregationTransformTest.java @@ -122,7 +122,7 @@ public class BeamAggregationTransformTest extends BeamTransformBaseTest{ //4. flat KV to a single record PCollection<BeamSqlRow> mergedStream = aggregatedStream.apply("mergeRecord", - ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(outputType, aggCalls))); + ParDo.of(new BeamAggregationTransforms.MergeAggregationRecord(outputType, aggCalls, -1))); mergedStream.setCoder(outRecordCoder); //assert function BeamAggregationTransform.AggregationGroupByKeyFn
