Repository: flink
Updated Branches:
  refs/heads/release-1.3 45923ffb8 -> 51fb7ed79


[FLINK-6614] [table] Fix translation of group auxiliary functions (e.g., 
TUMBLE_END).


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51fb7ed7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51fb7ed7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51fb7ed7

Branch: refs/heads/release-1.3
Commit: 51fb7ed791bc8c5c1c35dffcd9855a2e5a8f3087
Parents: 0246ce5
Author: Fabian Hueske <fhue...@apache.org>
Authored: Wed May 17 16:26:27 2017 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Thu May 18 22:06:00 2017 +0200

----------------------------------------------------------------------
 .../common/WindowStartEndPropertiesRule.scala   | 39 ++++++++++++++------
 .../scala/stream/sql/WindowAggregateTest.scala  | 27 ++++++++++++++
 2 files changed, 54 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/51fb7ed7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
index 7577deb..14e9b21 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala
@@ -36,13 +36,21 @@ class WindowStartEndPropertiesRule
   override def matches(call: RelOptRuleCall): Boolean = {
     val project = call.rel(0).asInstanceOf[LogicalProject]
     // project includes at least on group auxiliary function
-    project.getProjects.exists {
-      case c: RexCall => c.getOperator.isGroupAuxiliary
-      case _ => false
+
+    def hasGroupAuxiliaries(node: RexNode): Boolean = {
+      node match {
+        case c: RexCall if c.getOperator.isGroupAuxiliary => true
+        case c: RexCall =>
+          c.operands.exists(hasGroupAuxiliaries)
+        case _ => false
+      }
     }
+
+    project.getProjects.exists(hasGroupAuxiliaries)
   }
 
   override def onMatch(call: RelOptRuleCall): Unit = {
+
     val project = call.rel(0).asInstanceOf[LogicalProject]
     val innerProject = call.rel(1).asInstanceOf[LogicalProject]
     val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
@@ -62,20 +70,27 @@ class WindowStartEndPropertiesRule
     transformed.project(
       innerProject.getProjects ++ Seq(transformed.field("w$start"), 
transformed.field("w$end")))
 
-    // replace window auxiliary function by access to window properties
-    transformed.project(
-      project.getProjects.map{ x =>
-        if (WindowStartEndPropertiesRule.isWindowStart(x)) {
+    def replaceGroupAuxiliaries(node: RexNode): RexNode = {
+      node match {
+        case c: RexCall if WindowStartEndPropertiesRule.isWindowStart(c) =>
           // replace expression by access to window start
-          rexBuilder.makeCast(x.getType, transformed.field("w$start"), false)
-        } else if (WindowStartEndPropertiesRule.isWindowEnd(x)) {
+          rexBuilder.makeCast(c.getType, transformed.field("w$start"), false)
+        case c: RexCall if WindowStartEndPropertiesRule.isWindowEnd(c) =>
           // replace expression by access to window end
-          rexBuilder.makeCast(x.getType, transformed.field("w$end"), false)
-        } else {
+          rexBuilder.makeCast(c.getType, transformed.field("w$end"), false)
+        case c: RexCall =>
+          // replace expressions in children
+          val newOps = c.getOperands.map(replaceGroupAuxiliaries)
+          c.clone(c.getType, newOps)
+        case x =>
           // preserve expression
           x
-        }
       }
+    }
+
+    // replace window auxiliary function by access to window properties
+    transformed.project(
+      project.getProjects.map(replaceGroupAuxiliaries)
     )
     val res = transformed.build()
     call.transformTo(res)

http://git-wip-us.apache.org/repos/asf/flink/blob/51fb7ed7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index 2022db8..f95d0ab 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -150,6 +150,33 @@ class WindowAggregateTest extends TableTestBase {
     streamUtil.verifySql(sql, expected)
   }
 
+  @Test
+  def testExpressionOnWindowAuxFunction() = {
+    val sql =
+      "SELECT " +
+        "  COUNT(*), " +
+        "  TUMBLE_END(rowtime, INTERVAL '15' MINUTE) + INTERVAL '1' MINUTE " +
+        "FROM MyTable " +
+        "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)"
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupWindowAggregate",
+          unaryNode(
+            "DataStreamCalc",
+            streamTableNode(0),
+            term("select", "rowtime")
+          ),
+          term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
+          term("select", "COUNT(*) AS EXPR$0", "start('w$) AS w$start", 
"end('w$) AS w$end")
+        ),
+        term("select", "EXPR$0", "DATETIME_PLUS(w$end, 60000) AS $f1")
+      )
+
+    streamUtil.verifySql(sql, expected)
+  }
+
   @Test(expected = classOf[TableException])
   def testTumbleWindowNoOffset(): Unit = {
     val sqlQuery =

Reply via email to