Repository: flink Updated Branches: refs/heads/master 7ad489d87 -> a2580171d
[FLINK-6601] [table] Use time indicators in DataStreamLogicalWindowAggregateRule This closes #3924. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a2580171 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a2580171 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a2580171 Branch: refs/heads/master Commit: a2580171dd6e9044c0694deea83a2a2f1f9eb1ee Parents: 7ad489d Author: twalthr <twal...@apache.org> Authored: Tue May 16 16:34:54 2017 +0200 Committer: twalthr <twal...@apache.org> Committed: Wed May 17 11:53:31 2017 +0200 ---------------------------------------------------------------------- .../DataStreamLogicalWindowAggregateRule.scala | 34 +++++++++++--------- .../scala/stream/sql/WindowAggregateTest.scala | 6 ++-- .../calcite/RelTimeIndicatorConverterTest.scala | 3 +- 3 files changed, 23 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a2580171/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala index 38de539..050e2cd 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala @@ -22,6 +22,7 @@ import java.math.{BigDecimal => JBigDecimal} import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex._ +import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.flink.table.api.scala.{Session, Slide, Tumble} import org.apache.flink.table.api.{TableException, Window} @@ -33,31 +34,34 @@ import org.apache.flink.table.typeutils.TimeIntervalTypeInfo class DataStreamLogicalWindowAggregateRule extends LogicalWindowAggregateRule("DataStreamLogicalWindowAggregateRule") { - /** Returns a zero literal of the correct time type */ + /** Returns a reference to the time attribute with a time indicator type */ override private[table] def getInAggregateGroupExpression( rexBuilder: RexBuilder, - windowExpression: RexCall): RexNode = createZeroLiteral(rexBuilder, windowExpression) - - /** Returns a zero literal of the correct time type */ - override private[table] def getOutAggregateGroupExpression( - rexBuilder: RexBuilder, - windowExpression: RexCall): RexNode = createZeroLiteral(rexBuilder, windowExpression) - - private def createZeroLiteral( - rexBuilder: RexBuilder, windowExpression: RexCall): RexNode = { - val timeType = windowExpression.operands.get(0).getType - timeType match { + val timeAttribute = windowExpression.operands.get(0) + timeAttribute match { - case _ if FlinkTypeFactory.isTimeIndicatorType(timeType) => - rexBuilder.makeLiteral(0L, timeType, true) + case _ if FlinkTypeFactory.isTimeIndicatorType(timeAttribute.getType) => + timeAttribute case _ => - throw TableException(s"""Time attribute expected but $timeType encountered.""") + throw TableException( + s"""Time attribute expected but ${timeAttribute.getType} encountered.""") } } + /** Returns a zero literal of a timestamp type */ + override private[table] def getOutAggregateGroupExpression( + rexBuilder: RexBuilder, + windowExpression: RexCall): RexNode = { + + rexBuilder.makeLiteral( + 0L, + rexBuilder.getTypeFactory.createSqlType(SqlTypeName.TIMESTAMP), + true) + } + override private[table] def translateWindowExpression( windowExpr: RexCall, rowType: RelDataType): Window = { http://git-wip-us.apache.org/repos/asf/flink/blob/a2580171/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala index 3729ef0..2022db8 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala @@ -81,7 +81,7 @@ class WindowAggregateTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "1970-01-01 00:00:00 AS $f0", "c", "a") + term("select", "rowtime", "c", "a") ), term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)), term("select", @@ -109,7 +109,7 @@ class WindowAggregateTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "1970-01-01 00:00:00 AS $f0", "c", "a") + term("select", "proctime", "c", "a") ), term("window", SlidingGroupWindow('w$, 'proctime, 3600000.millis, 900000.millis)), term("select", @@ -138,7 +138,7 @@ class WindowAggregateTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "1970-01-01 00:00:00 AS $f0", "c", "a") + term("select", "proctime", "c", "a") ), term("window", SessionGroupWindow('w$, 'proctime, 900000.millis)), term("select", http://git-wip-us.apache.org/repos/asf/flink/blob/a2580171/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala index cf55d48..8963ee2 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala @@ -299,8 +299,7 @@ class RelTimeIndicatorConverterTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "long", "1970-01-01 00:00:00 AS $f1", - "TIME_MATERIALIZATION(rowtime) AS $f2") + term("select", "long", "rowtime", "TIME_MATERIALIZATION(rowtime) AS $f2") ), term("groupBy", "long"), term(