[hotfix] [table] Disable event-time OVER RANGE UNBOUNDED PRECEDING window.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cac9fa02 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cac9fa02 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cac9fa02 Branch: refs/heads/table-retraction Commit: cac9fa0288e85c5d8315c71d4a39e1e926102e07 Parents: fe2c61a Author: Fabian Hueske <[email protected]> Authored: Fri Mar 24 21:22:11 2017 +0100 Committer: Fabian Hueske <[email protected]> Committed: Fri Mar 24 21:23:25 2017 +0100 ---------------------------------------------------------------------- .../datastream/DataStreamOverAggregate.scala | 21 ++++++---- .../table/api/scala/stream/sql/SqlITCase.scala | 40 ++++++++++---------- 2 files changed, 33 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/cac9fa02/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala index 3dd7ee2..01e5a9a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala @@ -113,23 +113,28 @@ class DataStreamOverAggregate( if (overWindow.isRows) { // ROWS clause bounded OVER window throw new TableException( - "ROWS clause bounded proc-time OVER window no supported yet.") + "processing-time OVER ROWS PRECEDING window is not supported yet.") } else { // RANGE clause bounded OVER window throw new TableException( - "RANGE clause bounded proc-time OVER window no supported yet.") + "processing-time OVER RANGE PRECEDING window is not supported yet.") } } else { throw new TableException( - "OVER window only support ProcessingTime UNBOUNDED PRECEDING and CURRENT ROW " + - "condition.") + "processing-time OVER RANGE FOLLOWING window is not supported yet.") } case _: RowTimeType => // row-time OVER window if (overWindow.lowerBound.isPreceding && overWindow.lowerBound.isUnbounded && overWindow.upperBound.isCurrentRow) { - // unbounded preceding OVER window - createUnboundedAndCurrentRowEventTimeOverWindow(inputDS) + if (overWindow.isRows) { + // unbounded preceding OVER ROWS window + createUnboundedAndCurrentRowEventTimeOverWindow(inputDS) + } else { + // unbounded preceding OVER RANGE window + throw new TableException( + "row-time OVER RANGE UNBOUNDED PRECEDING window is not supported yet.") + } } else if (overWindow.lowerBound.isPreceding && overWindow.upperBound.isCurrentRow) { // bounded OVER window if (overWindow.isRows) { @@ -138,11 +143,11 @@ class DataStreamOverAggregate( } else { // RANGE clause bounded OVER window throw new TableException( - "RANGE clause bounded row-time OVER window no supported yet.") + "row-time OVER RANGE PRECEDING window is not supported yet.") } } else { throw new TableException( - "row-time OVER window only support CURRENT ROW condition.") + "row-time OVER RANGE FOLLOWING window is not supported yet.") } case _ => throw new TableException(s"Unsupported time type {$timeType}") http://git-wip-us.apache.org/repos/asf/flink/blob/cac9fa02/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala index 34a68b2..80ff42a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala @@ -448,15 +448,15 @@ class SqlITCase extends StreamingWithStateTestBase { val sqlQuery = "SELECT a, b, c, " + "SUM(b) over (" + - "partition by a order by rowtime() range between unbounded preceding and current row), " + + "partition by a order by rowtime() rows between unbounded preceding and current row), " + "count(b) over (" + - "partition by a order by rowtime() range between unbounded preceding and current row), " + + "partition by a order by rowtime() rows between unbounded preceding and current row), " + "avg(b) over (" + - "partition by a order by rowtime() range between unbounded preceding and current row), " + + "partition by a order by rowtime() rows between unbounded preceding and current row), " + "max(b) over (" + - "partition by a order by rowtime() range between unbounded preceding and current row), " + + "partition by a order by rowtime() rows between unbounded preceding and current row), " + "min(b) over (" + - "partition by a order by rowtime() range between unbounded preceding and current row) " + + "partition by a order by rowtime() rows between unbounded preceding and current row) " + "from T1" val data = Seq( @@ -526,15 +526,15 @@ class SqlITCase extends StreamingWithStateTestBase { val sqlQuery = "SELECT a, b, c, " + "SUM(b) over (" + - "partition by a order by rowtime() range between unbounded preceding and current row), " + + "partition by a order by rowtime() rows between unbounded preceding and current row), " + "count(b) over (" + - "partition by a order by rowtime() range between unbounded preceding and current row), " + + "partition by a order by rowtime() rows between unbounded preceding and current row), " + "avg(b) over (" + - "partition by a order by rowtime() range between unbounded preceding and current row), " + + "partition by a order by rowtime() rows between unbounded preceding and current row), " + "max(b) over (" + - "partition by a order by rowtime() range between unbounded preceding and current row), " + + "partition by a order by rowtime() rows between unbounded preceding and current row), " + "min(b) over (" + - "partition by a order by rowtime() range between unbounded preceding and current row) " + + "partition by a order by rowtime() rows between unbounded preceding and current row) " + "from T1" val data = Seq( @@ -596,11 +596,11 @@ class SqlITCase extends StreamingWithStateTestBase { env.setParallelism(1) val sqlQuery = "SELECT a, b, c, " + - "SUM(b) over (order by rowtime() range between unbounded preceding and current row), " + - "count(b) over (order by rowtime() range between unbounded preceding and current row), " + - "avg(b) over (order by rowtime() range between unbounded preceding and current row), " + - "max(b) over (order by rowtime() range between unbounded preceding and current row), " + - "min(b) over (order by rowtime() range between unbounded preceding and current row) " + + "SUM(b) over (order by rowtime() rows between unbounded preceding and current row), " + + "count(b) over (order by rowtime() rows between unbounded preceding and current row), " + + "avg(b) over (order by rowtime() rows between unbounded preceding and current row), " + + "max(b) over (order by rowtime() rows between unbounded preceding and current row), " + + "min(b) over (order by rowtime() rows between unbounded preceding and current row) " + "from T1" val data = Seq( @@ -651,11 +651,11 @@ class SqlITCase extends StreamingWithStateTestBase { env.setParallelism(1) val sqlQuery = "SELECT a, b, c, " + - "SUM(b) over (order by rowtime() range between unbounded preceding and current row), " + - "count(b) over (order by rowtime() range between unbounded preceding and current row), " + - "avg(b) over (order by rowtime() range between unbounded preceding and current row), " + - "max(b) over (order by rowtime() range between unbounded preceding and current row), " + - "min(b) over (order by rowtime() range between unbounded preceding and current row) " + + "SUM(b) over (order by rowtime() rows between unbounded preceding and current row), " + + "count(b) over (order by rowtime() rows between unbounded preceding and current row), " + + "avg(b) over (order by rowtime() rows between unbounded preceding and current row), " + + "max(b) over (order by rowtime() rows between unbounded preceding and current row), " + + "min(b) over (order by rowtime() rows between unbounded preceding and current row) " + "from T1" val data = Seq(
