Repository: flink Updated Branches: refs/heads/release-1.3 45923ffb8 -> 51fb7ed79
[FLINK-6614] [table] Fix translation of group auxiliary functions (e.g., TUMBLE_END). Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51fb7ed7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51fb7ed7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51fb7ed7 Branch: refs/heads/release-1.3 Commit: 51fb7ed791bc8c5c1c35dffcd9855a2e5a8f3087 Parents: 0246ce5 Author: Fabian Hueske <fhue...@apache.org> Authored: Wed May 17 16:26:27 2017 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Thu May 18 22:06:00 2017 +0200 ---------------------------------------------------------------------- .../common/WindowStartEndPropertiesRule.scala | 39 ++++++++++++++------ .../scala/stream/sql/WindowAggregateTest.scala | 27 ++++++++++++++ 2 files changed, 54 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/51fb7ed7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala index 7577deb..14e9b21 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala @@ -36,13 +36,21 @@ class WindowStartEndPropertiesRule override def matches(call: RelOptRuleCall): Boolean = { val project = call.rel(0).asInstanceOf[LogicalProject] // project includes at least on group auxiliary function - project.getProjects.exists { - case c: RexCall => c.getOperator.isGroupAuxiliary - case _ => false + + def hasGroupAuxiliaries(node: RexNode): Boolean = { + node match { + case c: RexCall if c.getOperator.isGroupAuxiliary => true + case c: RexCall => + c.operands.exists(hasGroupAuxiliaries) + case _ => false + } } + + project.getProjects.exists(hasGroupAuxiliaries) } override def onMatch(call: RelOptRuleCall): Unit = { + val project = call.rel(0).asInstanceOf[LogicalProject] val innerProject = call.rel(1).asInstanceOf[LogicalProject] val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate] @@ -62,20 +70,27 @@ class WindowStartEndPropertiesRule transformed.project( innerProject.getProjects ++ Seq(transformed.field("w$start"), transformed.field("w$end"))) - // replace window auxiliary function by access to window properties - transformed.project( - project.getProjects.map{ x => - if (WindowStartEndPropertiesRule.isWindowStart(x)) { + def replaceGroupAuxiliaries(node: RexNode): RexNode = { + node match { + case c: RexCall if WindowStartEndPropertiesRule.isWindowStart(c) => // replace expression by access to window start - rexBuilder.makeCast(x.getType, transformed.field("w$start"), false) - } else if (WindowStartEndPropertiesRule.isWindowEnd(x)) { + rexBuilder.makeCast(c.getType, transformed.field("w$start"), false) + case c: RexCall if WindowStartEndPropertiesRule.isWindowEnd(c) => // replace expression by access to window end - rexBuilder.makeCast(x.getType, transformed.field("w$end"), false) - } else { + rexBuilder.makeCast(c.getType, transformed.field("w$end"), false) + case c: RexCall => + // replace expressions in children + val newOps = c.getOperands.map(replaceGroupAuxiliaries) + c.clone(c.getType, newOps) + case x => // preserve expression x - } } + } + + // replace window auxiliary function by access to window properties + transformed.project( + project.getProjects.map(replaceGroupAuxiliaries) ) val res = transformed.build() call.transformTo(res) http://git-wip-us.apache.org/repos/asf/flink/blob/51fb7ed7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala index 2022db8..f95d0ab 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala @@ -150,6 +150,33 @@ class WindowAggregateTest extends TableTestBase { streamUtil.verifySql(sql, expected) } + @Test + def testExpressionOnWindowAuxFunction() = { + val sql = + "SELECT " + + " COUNT(*), " + + " TUMBLE_END(rowtime, INTERVAL '15' MINUTE) + INTERVAL '1' MINUTE " + + "FROM MyTable " + + "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)" + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamGroupWindowAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "rowtime") + ), + term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)), + term("select", "COUNT(*) AS EXPR$0", "start('w$) AS w$start", "end('w$) AS w$end") + ), + term("select", "EXPR$0", "DATETIME_PLUS(w$end, 60000) AS $f1") + ) + + streamUtil.verifySql(sql, expected) + } + @Test(expected = classOf[TableException]) def testTumbleWindowNoOffset(): Unit = { val sqlQuery =