[hotfix] [table] Disable event-time OVER RANGE UNBOUNDED PRECEDING window.

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cac9fa02
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cac9fa02
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cac9fa02

Branch: refs/heads/table-retraction
Commit: cac9fa0288e85c5d8315c71d4a39e1e926102e07
Parents: fe2c61a
Author: Fabian Hueske <[email protected]>
Authored: Fri Mar 24 21:22:11 2017 +0100
Committer: Fabian Hueske <[email protected]>
Committed: Fri Mar 24 21:23:25 2017 +0100

----------------------------------------------------------------------
 .../datastream/DataStreamOverAggregate.scala    | 21 ++++++----
 .../table/api/scala/stream/sql/SqlITCase.scala  | 40 ++++++++++----------
 2 files changed, 33 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cac9fa02/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
index 3dd7ee2..01e5a9a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala
@@ -113,23 +113,28 @@ class DataStreamOverAggregate(
           if (overWindow.isRows) {
             // ROWS clause bounded OVER window
             throw new TableException(
-              "ROWS clause bounded proc-time OVER window no supported yet.")
+              "processing-time OVER ROWS PRECEDING window is not supported 
yet.")
           } else {
             // RANGE clause bounded OVER window
             throw new TableException(
-              "RANGE clause bounded proc-time OVER window no supported yet.")
+              "processing-time OVER RANGE PRECEDING window is not supported 
yet.")
           }
         } else {
           throw new TableException(
-            "OVER window only support ProcessingTime UNBOUNDED PRECEDING and 
CURRENT ROW " +
-                "condition.")
+            "processing-time OVER RANGE FOLLOWING window is not supported 
yet.")
         }
       case _: RowTimeType =>
         // row-time OVER window
         if (overWindow.lowerBound.isPreceding &&
               overWindow.lowerBound.isUnbounded && 
overWindow.upperBound.isCurrentRow) {
-          // unbounded preceding OVER window
-          createUnboundedAndCurrentRowEventTimeOverWindow(inputDS)
+          if (overWindow.isRows) {
+            // unbounded preceding OVER ROWS window
+            createUnboundedAndCurrentRowEventTimeOverWindow(inputDS)
+          } else {
+            // unbounded preceding OVER RANGE window
+            throw new TableException(
+              "row-time OVER RANGE UNBOUNDED PRECEDING window is not supported 
yet.")
+          }
         } else if (overWindow.lowerBound.isPreceding && 
overWindow.upperBound.isCurrentRow) {
           // bounded OVER window
           if (overWindow.isRows) {
@@ -138,11 +143,11 @@ class DataStreamOverAggregate(
           } else {
             // RANGE clause bounded OVER window
             throw new TableException(
-              "RANGE clause bounded row-time OVER window no supported yet.")
+              "row-time OVER RANGE PRECEDING window is not supported yet.")
           }
         } else {
           throw new TableException(
-            "row-time OVER window only support CURRENT ROW condition.")
+            "row-time OVER RANGE FOLLOWING window is not supported yet.")
         }
       case _ =>
         throw new TableException(s"Unsupported time type {$timeType}")

http://git-wip-us.apache.org/repos/asf/flink/blob/cac9fa02/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
index 34a68b2..80ff42a 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -448,15 +448,15 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     val sqlQuery = "SELECT a, b, c, " +
       "SUM(b) over (" +
-      "partition by a order by rowtime() range between unbounded preceding and 
current row), " +
+      "partition by a order by rowtime() rows between unbounded preceding and 
current row), " +
       "count(b) over (" +
-      "partition by a order by rowtime() range between unbounded preceding and 
current row), " +
+      "partition by a order by rowtime() rows between unbounded preceding and 
current row), " +
       "avg(b) over (" +
-      "partition by a order by rowtime() range between unbounded preceding and 
current row), " +
+      "partition by a order by rowtime() rows between unbounded preceding and 
current row), " +
       "max(b) over (" +
-      "partition by a order by rowtime() range between unbounded preceding and 
current row), " +
+      "partition by a order by rowtime() rows between unbounded preceding and 
current row), " +
       "min(b) over (" +
-      "partition by a order by rowtime() range between unbounded preceding and 
current row) " +
+      "partition by a order by rowtime() rows between unbounded preceding and 
current row) " +
       "from T1"
 
     val data = Seq(
@@ -526,15 +526,15 @@ class SqlITCase extends StreamingWithStateTestBase {
 
     val sqlQuery = "SELECT a, b, c, " +
       "SUM(b) over (" +
-      "partition by a order by rowtime() range between unbounded preceding and 
current row), " +
+      "partition by a order by rowtime() rows between unbounded preceding and 
current row), " +
       "count(b) over (" +
-      "partition by a order by rowtime() range between unbounded preceding and 
current row), " +
+      "partition by a order by rowtime() rows between unbounded preceding and 
current row), " +
       "avg(b) over (" +
-      "partition by a order by rowtime() range between unbounded preceding and 
current row), " +
+      "partition by a order by rowtime() rows between unbounded preceding and 
current row), " +
       "max(b) over (" +
-      "partition by a order by rowtime() range between unbounded preceding and 
current row), " +
+      "partition by a order by rowtime() rows between unbounded preceding and 
current row), " +
       "min(b) over (" +
-      "partition by a order by rowtime() range between unbounded preceding and 
current row) " +
+      "partition by a order by rowtime() rows between unbounded preceding and 
current row) " +
       "from T1"
 
     val data = Seq(
@@ -596,11 +596,11 @@ class SqlITCase extends StreamingWithStateTestBase {
     env.setParallelism(1)
 
     val sqlQuery = "SELECT a, b, c, " +
-      "SUM(b) over (order by rowtime() range between unbounded preceding and 
current row), " +
-      "count(b) over (order by rowtime() range between unbounded preceding and 
current row), " +
-      "avg(b) over (order by rowtime() range between unbounded preceding and 
current row), " +
-      "max(b) over (order by rowtime() range between unbounded preceding and 
current row), " +
-      "min(b) over (order by rowtime() range between unbounded preceding and 
current row) " +
+      "SUM(b) over (order by rowtime() rows between unbounded preceding and 
current row), " +
+      "count(b) over (order by rowtime() rows between unbounded preceding and 
current row), " +
+      "avg(b) over (order by rowtime() rows between unbounded preceding and 
current row), " +
+      "max(b) over (order by rowtime() rows between unbounded preceding and 
current row), " +
+      "min(b) over (order by rowtime() rows between unbounded preceding and 
current row) " +
       "from T1"
 
     val data = Seq(
@@ -651,11 +651,11 @@ class SqlITCase extends StreamingWithStateTestBase {
     env.setParallelism(1)
 
     val sqlQuery = "SELECT a, b, c, " +
-      "SUM(b) over (order by rowtime() range between unbounded preceding and 
current row), " +
-      "count(b) over (order by rowtime() range between unbounded preceding and 
current row), " +
-      "avg(b) over (order by rowtime() range between unbounded preceding and 
current row), " +
-      "max(b) over (order by rowtime() range between unbounded preceding and 
current row), " +
-      "min(b) over (order by rowtime() range between unbounded preceding and 
current row) " +
+      "SUM(b) over (order by rowtime() rows between unbounded preceding and 
current row), " +
+      "count(b) over (order by rowtime() rows between unbounded preceding and 
current row), " +
+      "avg(b) over (order by rowtime() rows between unbounded preceding and 
current row), " +
+      "max(b) over (order by rowtime() rows between unbounded preceding and 
current row), " +
+      "min(b) over (order by rowtime() rows between unbounded preceding and 
current row) " +
       "from T1"
 
     val data = Seq(

Reply via email to