[FLINK-7357] [table] Fix translation of group window queries with window props and HAVING.
This closes #4521. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/df5efe9c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/df5efe9c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/df5efe9c Branch: refs/heads/master Commit: df5efe9cead172994abb2fd4858a27caacd9468c Parents: 73a2443 Author: Rong Rong <ro...@uber.com> Authored: Thu Aug 10 10:46:25 2017 -0700 Committer: Fabian Hueske <fhue...@apache.org> Committed: Wed Sep 20 10:12:13 2017 +0200 ---------------------------------------------------------------------- .../flink/table/plan/rules/FlinkRuleSets.scala | 4 +- .../common/WindowStartEndPropertiesRule.scala | 169 ++++++++++++------- .../table/api/batch/sql/GroupWindowTest.scala | 41 +++++ .../table/api/stream/sql/GroupWindowTest.scala | 38 +++++ .../table/runtime/stream/sql/SqlITCase.scala | 51 ++++++ 5 files changed, 241 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/df5efe9c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala index a81c7d2..073a8cc 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala @@ -147,7 +147,8 @@ object FlinkRuleSets { // Transform window to LogicalWindowAggregate DataSetLogicalWindowAggregateRule.INSTANCE, - WindowStartEndPropertiesRule.INSTANCE + WindowStartEndPropertiesRule.INSTANCE, + WindowStartEndPropertiesHavingRule.INSTANCE ) /** @@ -179,6 +180,7 @@ object FlinkRuleSets { // Transform window to LogicalWindowAggregate DataStreamLogicalWindowAggregateRule.INSTANCE, WindowStartEndPropertiesRule.INSTANCE, + WindowStartEndPropertiesHavingRule.INSTANCE, // simplify expressions rules ReduceExpressionsRule.FILTER_INSTANCE, http://git-wip-us.apache.org/repos/asf/flink/blob/df5efe9c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala index 14e9b21..33190e6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala @@ -18,20 +18,19 @@ package org.apache.flink.table.plan.rules.common -import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall} -import org.apache.calcite.rel.logical.LogicalProject +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand} +import org.apache.calcite.rel.logical.{LogicalFilter, LogicalProject} import org.apache.calcite.rex.{RexCall, RexNode} import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.tools.RelBuilder import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty import org.apache.flink.table.expressions.{WindowEnd, WindowStart} import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate import scala.collection.JavaConversions._ -class WindowStartEndPropertiesRule - extends RelOptRule( - WindowStartEndPropertiesRule.WINDOW_EXPRESSION_RULE_PREDICATE, - "WindowStartEndPropertiesRule") { +abstract class WindowStartEndPropertiesBaseRule(rulePredicate: RelOptRuleOperand, ruleName: String) + extends RelOptRule(rulePredicate, ruleName) { override def matches(call: RelOptRuleCall): Boolean = { val project = call.rel(0).asInstanceOf[LogicalProject] @@ -49,61 +48,24 @@ class WindowStartEndPropertiesRule project.getProjects.exists(hasGroupAuxiliaries) } - override def onMatch(call: RelOptRuleCall): Unit = { - - val project = call.rel(0).asInstanceOf[LogicalProject] - val innerProject = call.rel(1).asInstanceOf[LogicalProject] - val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate] - - // Retrieve window start and end properties - val transformed = call.builder() - val rexBuilder = transformed.getRexBuilder - transformed.push(LogicalWindowAggregate.create( - agg.getWindow, - Seq( - NamedWindowProperty("w$start", WindowStart(agg.getWindow.aliasAttribute)), - NamedWindowProperty("w$end", WindowEnd(agg.getWindow.aliasAttribute)) - ), agg) - ) - - // forward window start and end properties - transformed.project( - innerProject.getProjects ++ Seq(transformed.field("w$start"), transformed.field("w$end"))) - - def replaceGroupAuxiliaries(node: RexNode): RexNode = { - node match { - case c: RexCall if WindowStartEndPropertiesRule.isWindowStart(c) => - // replace expression by access to window start - rexBuilder.makeCast(c.getType, transformed.field("w$start"), false) - case c: RexCall if WindowStartEndPropertiesRule.isWindowEnd(c) => - // replace expression by access to window end - rexBuilder.makeCast(c.getType, transformed.field("w$end"), false) - case c: RexCall => - // replace expressions in children - val newOps = c.getOperands.map(replaceGroupAuxiliaries) - c.clone(c.getType, newOps) - case x => - // preserve expression - x - } + def replaceGroupAuxiliaries(node: RexNode, relBuilder: RelBuilder): RexNode = { + val rexBuilder = relBuilder.getRexBuilder + node match { + case c: RexCall if isWindowStart(c) => + // replace expression by access to window start + rexBuilder.makeCast(c.getType, relBuilder.field("w$start"), false) + case c: RexCall if isWindowEnd(c) => + // replace expression by access to window end + rexBuilder.makeCast(c.getType, relBuilder.field("w$end"), false) + case c: RexCall => + // replace expressions in children + val newOps = c.getOperands.map(x => replaceGroupAuxiliaries(x, relBuilder)) + c.clone(c.getType, newOps) + case x => + // preserve expression + x } - - // replace window auxiliary function by access to window properties - transformed.project( - project.getProjects.map(replaceGroupAuxiliaries) - ) - val res = transformed.build() - call.transformTo(res) } -} - -object WindowStartEndPropertiesRule { - private val WINDOW_EXPRESSION_RULE_PREDICATE = - RelOptRule.operand(classOf[LogicalProject], - RelOptRule.operand(classOf[LogicalProject], - RelOptRule.operand(classOf[LogicalWindowAggregate], RelOptRule.none()))) - - val INSTANCE = new WindowStartEndPropertiesRule /** Checks if a RexNode is a window start auxiliary function. */ private def isWindowStart(node: RexNode): Boolean = { @@ -113,7 +75,7 @@ object WindowStartEndPropertiesRule { case SqlStdOperatorTable.TUMBLE_START | SqlStdOperatorTable.HOP_START | SqlStdOperatorTable.SESSION_START - => true + => true case _ => false } case _ => false @@ -128,10 +90,95 @@ object WindowStartEndPropertiesRule { case SqlStdOperatorTable.TUMBLE_END | SqlStdOperatorTable.HOP_END | SqlStdOperatorTable.SESSION_END - => true + => true case _ => false } case _ => false } } } + +object WindowStartEndPropertiesRule { + + val INSTANCE = new WindowStartEndPropertiesBaseRule( + RelOptRule.operand(classOf[LogicalProject], + RelOptRule.operand(classOf[LogicalProject], + RelOptRule.operand(classOf[LogicalWindowAggregate], RelOptRule.none()))), + "WindowStartEndPropertiesRule") { + + override def onMatch(call: RelOptRuleCall): Unit = { + + val project = call.rel(0).asInstanceOf[LogicalProject] + val innerProject = call.rel(1).asInstanceOf[LogicalProject] + val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate] + + // Retrieve window start and end properties + val builder = call.builder() + builder.push(LogicalWindowAggregate.create( + agg.getWindow, + Seq( + NamedWindowProperty("w$start", WindowStart(agg.getWindow.aliasAttribute)), + NamedWindowProperty("w$end", WindowEnd(agg.getWindow.aliasAttribute))), + agg) + ) + + // forward window start and end properties + builder.project( + innerProject.getProjects ++ Seq(builder.field("w$start"), builder.field("w$end"))) + + // replace window auxiliary function by access to window properties + builder.project( + project.getProjects.map(expr => replaceGroupAuxiliaries(expr, builder)) + ) + val res = builder.build() + call.transformTo(res) + } + } +} + +object WindowStartEndPropertiesHavingRule { + + val INSTANCE = new WindowStartEndPropertiesBaseRule( + RelOptRule.operand(classOf[LogicalProject], + RelOptRule.operand(classOf[LogicalFilter], + RelOptRule.operand(classOf[LogicalProject], + RelOptRule.operand(classOf[LogicalWindowAggregate], RelOptRule.none())))), + "WindowStartEndPropertiesHavingRule") { + + override def onMatch(call: RelOptRuleCall): Unit = { + + val project = call.rel(0).asInstanceOf[LogicalProject] + val filter = call.rel(1).asInstanceOf[LogicalFilter] + val innerProject = call.rel(2).asInstanceOf[LogicalProject] + val agg = call.rel(3).asInstanceOf[LogicalWindowAggregate] + + // Retrieve window start and end properties + val builder = call.builder() + builder.push(LogicalWindowAggregate.create( + agg.getWindow, + Seq( + NamedWindowProperty("w$start", WindowStart(agg.getWindow.aliasAttribute)), + NamedWindowProperty("w$end", WindowEnd(agg.getWindow.aliasAttribute))), + agg) + ) + + // forward window start and end properties + builder.project( + innerProject.getProjects ++ Seq(builder.field("w$start"), builder.field("w$end"))) + + // replace window auxiliary function by access to window properties + builder.filter( + filter.getChildExps.map(expr => replaceGroupAuxiliaries(expr, builder)) + ) + + // replace window auxiliary function by access to window properties + builder.project( + project.getProjects.map(expr => replaceGroupAuxiliaries(expr, builder)) + ) + + val res = builder.build() + call.transformTo(res) + } + } +} + http://git-wip-us.apache.org/repos/asf/flink/blob/df5efe9c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala index e77087c..a78aa8c 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala @@ -256,4 +256,45 @@ class GroupWindowTest extends TableTestBase { util.verifySql(sqlQuery, expected) } + + @Test + def testExpressionOnWindowHavingFunction() = { + val util = batchTestUtil() + util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts) + + val sql = + "SELECT " + + " COUNT(*), " + + " HOP_START(ts, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE) " + + "FROM T " + + "GROUP BY HOP(ts, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE) " + + "HAVING " + + " SUM(a) > 0 AND " + + " QUARTER(HOP_START(ts, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE)) = 1" + + val expected = + unaryNode( + "DataSetCalc", + unaryNode( + "DataSetWindowAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "ts, a") + ), + term("window", SlidingGroupWindow('w$, 'ts, 60000.millis, 900000.millis)), + term("select", + "COUNT(*) AS EXPR$0", + "SUM(a) AS $f1", + "start('w$) AS w$start", + "end('w$) AS w$end") + ), + term("select", "EXPR$0", "CAST(w$start) AS w$start"), + term("where", + "AND(>($f1, 0), " + + "=(EXTRACT_DATE(FLAG(QUARTER), /INT(Reinterpret(CAST(w$start)), 86400000)), 1))") + ) + + util.verifySql(sql, expected) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/df5efe9c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala index 4823903..722c4f0 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala @@ -143,4 +143,42 @@ class GroupWindowTest extends TableTestBase { streamUtil.verifySql(sql, expected) } + + @Test + def testExpressionOnWindowHavingFunction() = { + val sql = + "SELECT " + + " COUNT(*), " + + " HOP_START(rowtime, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE) " + + "FROM MyTable " + + "GROUP BY HOP(rowtime, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE) " + + "HAVING " + + " SUM(a) > 0 AND " + + " QUARTER(HOP_START(rowtime, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE)) = 1" + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamGroupWindowAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "rowtime, a") + ), + term("window", SlidingGroupWindow('w$, 'rowtime, 60000.millis, 900000.millis)), + term("select", + "COUNT(*) AS EXPR$0", + "SUM(a) AS $f1", + "start('w$) AS w$start", + "end('w$) AS w$end") + ), + term("select", "EXPR$0", "w$start"), + term("where", + "AND(>($f1, 0), " + + "=(EXTRACT_DATE(FLAG(QUARTER), /INT(Reinterpret(w$start), 86400000)), 1))") + ) + + streamUtil.verifySql(sql, expected) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/df5efe9c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala index d2f9a9a..5398c6d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala @@ -21,9 +21,11 @@ package org.apache.flink.table.runtime.stream.sql import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ +import org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} import org.apache.flink.types.Row import org.junit.Assert._ @@ -314,5 +316,54 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + @Test + def testHopStartEndWithHaving(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setStateBackend(getStateBackend) + StreamITCase.clear + env.setParallelism(1) + + val sqlQueryHopStartEndWithHaving = + """ + |SELECT + | c AS k, + | COUNT(a) AS v, + | HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS windowStart, + | HOP_END(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS windowEnd + |FROM T1 + |GROUP BY HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE), c + |HAVING + | SUM(b) > 1 AND + | QUARTER(HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE)) = 1 + """.stripMargin + + val data = Seq( + Left(14000005L, (1, 1L, "Hi")), + Left(14000000L, (2, 1L, "Hello")), + Left(14000002L, (3, 1L, "Hello")), + Right(14000010L), + Left(8640000000L, (4, 1L, "Hello")), // data for the quarter to validate having filter + Left(8640000001L, (4, 1L, "Hello")), + Right(8640000010L) + ) + + val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data)) + .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime) + + tEnv.registerTable("T1", t1) + + val resultHopStartEndWithHaving = tEnv.sql(sqlQueryHopStartEndWithHaving).toAppendStream[Row] + resultHopStartEndWithHaving.addSink(new StreamITCase.StringSink[Row]) + + env.execute() + + val expected = List( + "Hello,2,1970-01-01 03:53:00.0,1970-01-01 03:54:00.0" + ) + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + }