Repository: flink
Updated Branches:
  refs/heads/master 7ad489d87 -> a2580171d


[FLINK-6601] [table] Use time indicators in DataStreamLogicalWindowAggregateRule

This closes #3924.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a2580171
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a2580171
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a2580171

Branch: refs/heads/master
Commit: a2580171dd6e9044c0694deea83a2a2f1f9eb1ee
Parents: 7ad489d
Author: twalthr <twal...@apache.org>
Authored: Tue May 16 16:34:54 2017 +0200
Committer: twalthr <twal...@apache.org>
Committed: Wed May 17 11:53:31 2017 +0200

----------------------------------------------------------------------
 .../DataStreamLogicalWindowAggregateRule.scala  | 34 +++++++++++---------
 .../scala/stream/sql/WindowAggregateTest.scala  |  6 ++--
 .../calcite/RelTimeIndicatorConverterTest.scala |  3 +-
 3 files changed, 23 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a2580171/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
index 38de539..050e2cd 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
@@ -22,6 +22,7 @@ import java.math.{BigDecimal => JBigDecimal}
 
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex._
+import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.flink.table.api.scala.{Session, Slide, Tumble}
 import org.apache.flink.table.api.{TableException, Window}
@@ -33,31 +34,34 @@ import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
 class DataStreamLogicalWindowAggregateRule
   extends LogicalWindowAggregateRule("DataStreamLogicalWindowAggregateRule") {
 
-  /** Returns a zero literal of the correct time type */
+  /** Returns a reference to the time attribute with a time indicator type */
   override private[table] def getInAggregateGroupExpression(
       rexBuilder: RexBuilder,
-      windowExpression: RexCall): RexNode = createZeroLiteral(rexBuilder, 
windowExpression)
-
-  /** Returns a zero literal of the correct time type */
-  override private[table] def getOutAggregateGroupExpression(
-      rexBuilder: RexBuilder,
-      windowExpression: RexCall): RexNode = createZeroLiteral(rexBuilder, 
windowExpression)
-
-  private def createZeroLiteral(
-      rexBuilder: RexBuilder,
       windowExpression: RexCall): RexNode = {
 
-    val timeType = windowExpression.operands.get(0).getType
-    timeType match {
+    val timeAttribute = windowExpression.operands.get(0)
+    timeAttribute match {
 
-      case _ if FlinkTypeFactory.isTimeIndicatorType(timeType) =>
-        rexBuilder.makeLiteral(0L, timeType, true)
+      case _ if FlinkTypeFactory.isTimeIndicatorType(timeAttribute.getType) =>
+        timeAttribute
 
       case _ =>
-        throw TableException(s"""Time attribute expected but $timeType 
encountered.""")
+        throw TableException(
+          s"""Time attribute expected but ${timeAttribute.getType} 
encountered.""")
     }
   }
 
+  /** Returns a zero literal of a timestamp type */
+  override private[table] def getOutAggregateGroupExpression(
+      rexBuilder: RexBuilder,
+      windowExpression: RexCall): RexNode = {
+
+    rexBuilder.makeLiteral(
+      0L,
+      rexBuilder.getTypeFactory.createSqlType(SqlTypeName.TIMESTAMP),
+      true)
+  }
+
   override private[table] def translateWindowExpression(
       windowExpr: RexCall,
       rowType: RelDataType): Window = {

http://git-wip-us.apache.org/repos/asf/flink/blob/a2580171/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index 3729ef0..2022db8 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -81,7 +81,7 @@ class WindowAggregateTest extends TableTestBase {
         unaryNode(
           "DataStreamCalc",
           streamTableNode(0),
-          term("select", "1970-01-01 00:00:00 AS $f0", "c", "a")
+          term("select", "rowtime", "c", "a")
         ),
         term("window", TumblingGroupWindow('w$, 'rowtime, 900000.millis)),
         term("select",
@@ -109,7 +109,7 @@ class WindowAggregateTest extends TableTestBase {
         unaryNode(
           "DataStreamCalc",
           streamTableNode(0),
-          term("select", "1970-01-01 00:00:00 AS $f0", "c", "a")
+          term("select", "proctime", "c", "a")
         ),
         term("window", SlidingGroupWindow('w$, 'proctime, 3600000.millis, 
900000.millis)),
         term("select",
@@ -138,7 +138,7 @@ class WindowAggregateTest extends TableTestBase {
         unaryNode(
           "DataStreamCalc",
           streamTableNode(0),
-          term("select", "1970-01-01 00:00:00 AS $f0", "c", "a")
+          term("select", "proctime", "c", "a")
         ),
         term("window", SessionGroupWindow('w$, 'proctime, 900000.millis)),
         term("select",

http://git-wip-us.apache.org/repos/asf/flink/blob/a2580171/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
index cf55d48..8963ee2 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
@@ -299,8 +299,7 @@ class RelTimeIndicatorConverterTest extends TableTestBase {
         unaryNode(
           "DataStreamCalc",
           streamTableNode(0),
-          term("select", "long", "1970-01-01 00:00:00 AS $f1",
-            "TIME_MATERIALIZATION(rowtime) AS $f2")
+          term("select", "long", "rowtime", "TIME_MATERIALIZATION(rowtime) AS 
$f2")
         ),
         term("groupBy", "long"),
         term(

Reply via email to