[FLINK-7357] [table] Fix translation of group window queries with window props 
and HAVING.

This closes #4521.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/df5efe9c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/df5efe9c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/df5efe9c

Branch: refs/heads/master
Commit: df5efe9cead172994abb2fd4858a27caacd9468c
Parents: 73a2443
Author: Rong Rong <ro...@uber.com>
Authored: Thu Aug 10 10:46:25 2017 -0700
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Wed Sep 20 10:12:13 2017 +0200

----------------------------------------------------------------------
 .../flink/table/plan/rules/FlinkRuleSets.scala  |   4 +-
 .../common/WindowStartEndPropertiesRule.scala   | 169 ++++++++++++-------
 .../table/api/batch/sql/GroupWindowTest.scala   |  41 +++++
 .../table/api/stream/sql/GroupWindowTest.scala  |  38 +++++
 .../table/runtime/stream/sql/SqlITCase.scala    |  51 ++++++
 5 files changed, 241 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/df5efe9c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index a81c7d2..073a8cc 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -147,7 +147,8 @@ object FlinkRuleSets {
 
     // Transform window to LogicalWindowAggregate
     DataSetLogicalWindowAggregateRule.INSTANCE,
-    WindowStartEndPropertiesRule.INSTANCE
+    WindowStartEndPropertiesRule.INSTANCE,
+    WindowStartEndPropertiesHavingRule.INSTANCE
   )
 
   /**
@@ -179,6 +180,7 @@ object FlinkRuleSets {
     // Transform window to LogicalWindowAggregate
     DataStreamLogicalWindowAggregateRule.INSTANCE,
     WindowStartEndPropertiesRule.INSTANCE,
+    WindowStartEndPropertiesHavingRule.INSTANCE,
 
     // simplify expressions rules
     ReduceExpressionsRule.FILTER_INSTANCE,

http://git-wip-us.apache.org/repos/asf/flink/blob/df5efe9c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
index 14e9b21..33190e6 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
@@ -18,20 +18,19 @@
 
 package org.apache.flink.table.plan.rules.common
 
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
-import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
+import org.apache.calcite.rel.logical.{LogicalFilter, LogicalProject}
 import org.apache.calcite.rex.{RexCall, RexNode}
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.expressions.{WindowEnd, WindowStart}
 import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
 
 import scala.collection.JavaConversions._
 
-class WindowStartEndPropertiesRule
-  extends RelOptRule(
-    WindowStartEndPropertiesRule.WINDOW_EXPRESSION_RULE_PREDICATE,
-    "WindowStartEndPropertiesRule") {
+abstract class WindowStartEndPropertiesBaseRule(rulePredicate: 
RelOptRuleOperand, ruleName: String)
+  extends RelOptRule(rulePredicate, ruleName) {
 
   override def matches(call: RelOptRuleCall): Boolean = {
     val project = call.rel(0).asInstanceOf[LogicalProject]
@@ -49,61 +48,24 @@ class WindowStartEndPropertiesRule
     project.getProjects.exists(hasGroupAuxiliaries)
   }
 
-  override def onMatch(call: RelOptRuleCall): Unit = {
-
-    val project = call.rel(0).asInstanceOf[LogicalProject]
-    val innerProject = call.rel(1).asInstanceOf[LogicalProject]
-    val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
-
-    // Retrieve window start and end properties
-    val transformed = call.builder()
-    val rexBuilder = transformed.getRexBuilder
-    transformed.push(LogicalWindowAggregate.create(
-      agg.getWindow,
-      Seq(
-        NamedWindowProperty("w$start", 
WindowStart(agg.getWindow.aliasAttribute)),
-        NamedWindowProperty("w$end", WindowEnd(agg.getWindow.aliasAttribute))
-      ), agg)
-    )
-
-    // forward window start and end properties
-    transformed.project(
-      innerProject.getProjects ++ Seq(transformed.field("w$start"), 
transformed.field("w$end")))
-
-    def replaceGroupAuxiliaries(node: RexNode): RexNode = {
-      node match {
-        case c: RexCall if WindowStartEndPropertiesRule.isWindowStart(c) =>
-          // replace expression by access to window start
-          rexBuilder.makeCast(c.getType, transformed.field("w$start"), false)
-        case c: RexCall if WindowStartEndPropertiesRule.isWindowEnd(c) =>
-          // replace expression by access to window end
-          rexBuilder.makeCast(c.getType, transformed.field("w$end"), false)
-        case c: RexCall =>
-          // replace expressions in children
-          val newOps = c.getOperands.map(replaceGroupAuxiliaries)
-          c.clone(c.getType, newOps)
-        case x =>
-          // preserve expression
-          x
-      }
+  def replaceGroupAuxiliaries(node: RexNode, relBuilder: RelBuilder): RexNode 
= {
+    val rexBuilder = relBuilder.getRexBuilder
+    node match {
+      case c: RexCall if isWindowStart(c) =>
+        // replace expression by access to window start
+        rexBuilder.makeCast(c.getType, relBuilder.field("w$start"), false)
+      case c: RexCall if isWindowEnd(c) =>
+        // replace expression by access to window end
+        rexBuilder.makeCast(c.getType, relBuilder.field("w$end"), false)
+      case c: RexCall =>
+        // replace expressions in children
+        val newOps = c.getOperands.map(x => replaceGroupAuxiliaries(x, 
relBuilder))
+        c.clone(c.getType, newOps)
+      case x =>
+        // preserve expression
+        x
     }
-
-    // replace window auxiliary function by access to window properties
-    transformed.project(
-      project.getProjects.map(replaceGroupAuxiliaries)
-    )
-    val res = transformed.build()
-    call.transformTo(res)
   }
-}
-
-object WindowStartEndPropertiesRule {
-  private val WINDOW_EXPRESSION_RULE_PREDICATE =
-    RelOptRule.operand(classOf[LogicalProject],
-      RelOptRule.operand(classOf[LogicalProject],
-        RelOptRule.operand(classOf[LogicalWindowAggregate], 
RelOptRule.none())))
-
-  val INSTANCE = new WindowStartEndPropertiesRule
 
   /** Checks if a RexNode is a window start auxiliary function. */
   private def isWindowStart(node: RexNode): Boolean = {
@@ -113,7 +75,7 @@ object WindowStartEndPropertiesRule {
           case SqlStdOperatorTable.TUMBLE_START |
                SqlStdOperatorTable.HOP_START |
                SqlStdOperatorTable.SESSION_START
-            => true
+          => true
           case _ => false
         }
       case _ => false
@@ -128,10 +90,95 @@ object WindowStartEndPropertiesRule {
           case SqlStdOperatorTable.TUMBLE_END |
                SqlStdOperatorTable.HOP_END |
                SqlStdOperatorTable.SESSION_END
-            => true
+          => true
           case _ => false
         }
       case _ => false
     }
   }
 }
+
+object WindowStartEndPropertiesRule {
+
+  val INSTANCE = new WindowStartEndPropertiesBaseRule(
+    RelOptRule.operand(classOf[LogicalProject],
+      RelOptRule.operand(classOf[LogicalProject],
+        RelOptRule.operand(classOf[LogicalWindowAggregate], 
RelOptRule.none()))),
+    "WindowStartEndPropertiesRule") {
+
+    override def onMatch(call: RelOptRuleCall): Unit = {
+
+      val project = call.rel(0).asInstanceOf[LogicalProject]
+      val innerProject = call.rel(1).asInstanceOf[LogicalProject]
+      val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
+
+      // Retrieve window start and end properties
+      val builder = call.builder()
+      builder.push(LogicalWindowAggregate.create(
+        agg.getWindow,
+        Seq(
+          NamedWindowProperty("w$start", 
WindowStart(agg.getWindow.aliasAttribute)),
+          NamedWindowProperty("w$end", 
WindowEnd(agg.getWindow.aliasAttribute))),
+        agg)
+      )
+
+      // forward window start and end properties
+      builder.project(
+        innerProject.getProjects ++ Seq(builder.field("w$start"), 
builder.field("w$end")))
+
+      // replace window auxiliary function by access to window properties
+      builder.project(
+        project.getProjects.map(expr => replaceGroupAuxiliaries(expr, builder))
+      )
+      val res = builder.build()
+      call.transformTo(res)
+    }
+  }
+}
+
+object WindowStartEndPropertiesHavingRule {
+
+  val INSTANCE = new WindowStartEndPropertiesBaseRule(
+    RelOptRule.operand(classOf[LogicalProject],
+      RelOptRule.operand(classOf[LogicalFilter],
+        RelOptRule.operand(classOf[LogicalProject],
+          RelOptRule.operand(classOf[LogicalWindowAggregate], 
RelOptRule.none())))),
+    "WindowStartEndPropertiesHavingRule") {
+
+    override def onMatch(call: RelOptRuleCall): Unit = {
+
+      val project = call.rel(0).asInstanceOf[LogicalProject]
+      val filter = call.rel(1).asInstanceOf[LogicalFilter]
+      val innerProject = call.rel(2).asInstanceOf[LogicalProject]
+      val agg = call.rel(3).asInstanceOf[LogicalWindowAggregate]
+
+      // Retrieve window start and end properties
+      val builder = call.builder()
+      builder.push(LogicalWindowAggregate.create(
+        agg.getWindow,
+        Seq(
+          NamedWindowProperty("w$start", 
WindowStart(agg.getWindow.aliasAttribute)),
+          NamedWindowProperty("w$end", 
WindowEnd(agg.getWindow.aliasAttribute))),
+        agg)
+      )
+
+      // forward window start and end properties
+      builder.project(
+        innerProject.getProjects ++ Seq(builder.field("w$start"), 
builder.field("w$end")))
+
+      // replace window auxiliary function by access to window properties
+      builder.filter(
+        filter.getChildExps.map(expr => replaceGroupAuxiliaries(expr, builder))
+      )
+
+      // replace window auxiliary function by access to window properties
+      builder.project(
+        project.getProjects.map(expr => replaceGroupAuxiliaries(expr, builder))
+      )
+
+      val res = builder.build()
+      call.transformTo(res)
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/df5efe9c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
index e77087c..a78aa8c 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/GroupWindowTest.scala
@@ -256,4 +256,45 @@ class GroupWindowTest extends TableTestBase {
 
     util.verifySql(sqlQuery, expected)
   }
+
+  @Test
+  def testExpressionOnWindowHavingFunction() = {
+    val util = batchTestUtil()
+    util.addTable[(Int, Long, String, Timestamp)]("T", 'a, 'b, 'c, 'ts)
+
+    val sql =
+      "SELECT " +
+        "  COUNT(*), " +
+        "  HOP_START(ts, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE) " +
+        "FROM T " +
+        "GROUP BY HOP(ts, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE) " +
+        "HAVING " +
+        "  SUM(a) > 0 AND " +
+        "  QUARTER(HOP_START(ts, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE)) = 
1"
+
+    val expected =
+      unaryNode(
+        "DataSetCalc",
+        unaryNode(
+          "DataSetWindowAggregate",
+          unaryNode(
+            "DataSetCalc",
+            batchTableNode(0),
+            term("select", "ts, a")
+          ),
+          term("window", SlidingGroupWindow('w$, 'ts, 60000.millis, 
900000.millis)),
+          term("select",
+            "COUNT(*) AS EXPR$0",
+            "SUM(a) AS $f1",
+            "start('w$) AS w$start",
+            "end('w$) AS w$end")
+        ),
+        term("select", "EXPR$0", "CAST(w$start) AS w$start"),
+        term("where",
+          "AND(>($f1, 0), " +
+            "=(EXTRACT_DATE(FLAG(QUARTER), /INT(Reinterpret(CAST(w$start)), 
86400000)), 1))")
+      )
+
+    util.verifySql(sql, expected)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/df5efe9c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
index 4823903..722c4f0 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/GroupWindowTest.scala
@@ -143,4 +143,42 @@ class GroupWindowTest extends TableTestBase {
 
     streamUtil.verifySql(sql, expected)
   }
+
+  @Test
+  def testExpressionOnWindowHavingFunction() = {
+    val sql =
+      "SELECT " +
+        "  COUNT(*), " +
+        "  HOP_START(rowtime, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE) " +
+        "FROM MyTable " +
+        "GROUP BY HOP(rowtime, INTERVAL '15' MINUTE, INTERVAL '1' MINUTE) " +
+        "HAVING " +
+        "  SUM(a) > 0 AND " +
+        "  QUARTER(HOP_START(rowtime, INTERVAL '15' MINUTE, INTERVAL '1' 
MINUTE)) = 1"
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupWindowAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "rowtime, a")
+          ),
+          term("window", SlidingGroupWindow('w$, 'rowtime, 60000.millis, 
900000.millis)),
+          term("select",
+            "COUNT(*) AS EXPR$0",
+            "SUM(a) AS $f1",
+            "start('w$) AS w$start",
+            "end('w$) AS w$end")
+        ),
+        term("select", "EXPR$0", "w$start"),
+        term("where",
+          "AND(>($f1, 0), " +
+            "=(EXTRACT_DATE(FLAG(QUARTER), /INT(Reinterpret(w$start), 
86400000)), 1))")
+      )
+
+    streamUtil.verifySql(sql, expected)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/df5efe9c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
index d2f9a9a..5398c6d 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
@@ -21,9 +21,11 @@ package org.apache.flink.table.runtime.stream.sql
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.scala._
+import 
org.apache.flink.table.runtime.utils.TimeTestUtil.EventTimeSourceFunction
 import org.apache.flink.table.runtime.utils.{StreamITCase, StreamTestData, 
StreamingWithStateTestBase}
 import org.apache.flink.types.Row
 import org.junit.Assert._
@@ -314,5 +316,54 @@ class SqlITCase extends StreamingWithStateTestBase {
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testHopStartEndWithHaving(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    env.setStateBackend(getStateBackend)
+    StreamITCase.clear
+    env.setParallelism(1)
+
+    val sqlQueryHopStartEndWithHaving =
+      """
+        |SELECT
+        |  c AS k,
+        |  COUNT(a) AS v,
+        |  HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS 
windowStart,
+        |  HOP_END(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE) AS 
windowEnd
+        |FROM T1
+        |GROUP BY HOP(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' MINUTE), c
+        |HAVING
+        |  SUM(b) > 1 AND
+        |    QUARTER(HOP_START(rowtime, INTERVAL '1' MINUTE, INTERVAL '1' 
MINUTE)) = 1
+      """.stripMargin
+
+    val data = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 1L, "Hello")),
+      Left(14000002L, (3, 1L, "Hello")),
+      Right(14000010L),
+      Left(8640000000L, (4, 1L, "Hello")), // data for the quarter to validate 
having filter
+      Left(8640000001L, (4, 1L, "Hello")),
+      Right(8640000010L)
+    )
+
+    val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, 
String)](data))
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    tEnv.registerTable("T1", t1)
+
+    val resultHopStartEndWithHaving = 
tEnv.sql(sqlQueryHopStartEndWithHaving).toAppendStream[Row]
+    resultHopStartEndWithHaving.addSink(new StreamITCase.StringSink[Row])
+
+    env.execute()
+
+    val expected = List(
+      "Hello,2,1970-01-01 03:53:00.0,1970-01-01 03:54:00.0"
+    )
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
 }
 

Reply via email to