Repository: flink Updated Branches: refs/heads/release-1.4 8fa60ec20 -> e952408cb
[FLINK-8650] [table] Add documentation and tests for WINDOW clause. This closes #6226. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e952408c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e952408c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e952408c Branch: refs/heads/release-1.4 Commit: e952408cb823e7b0c5c105e09f1556202c98320e Parents: 8fa60ec Author: snuyanzin <[email protected]> Authored: Thu Jun 28 19:19:25 2018 +0300 Committer: Fabian Hueske <[email protected]> Committed: Mon Jul 2 15:24:02 2018 +0200 ---------------------------------------------------------------------- docs/dev/table/sql.md | 31 +++++- .../table/api/stream/sql/OverWindowTest.scala | 101 +++++++++++++++++++ .../flink/table/utils/TableTestBase.scala | 13 +++ 3 files changed, 141 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e952408c/docs/dev/table/sql.md ---------------------------------------------------------------------- diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md index 82211d5..5c9121b 100644 --- a/docs/dev/table/sql.md +++ b/docs/dev/table/sql.md @@ -115,6 +115,10 @@ The following BNF-grammar describes the superset of supported SQL features in ba ``` +insert: + INSERT INTO tableReference + query + query: values | { @@ -139,7 +143,8 @@ select: [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] [ HAVING booleanExpression ] - + [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ] + selectWithoutFrom: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } @@ -176,9 +181,20 @@ groupItem: | ROLLUP '(' expression [, expression ]* ')' | GROUPING SETS '(' groupItem [, groupItem ]* ')' -insert: - INSERT INTO tableReference - query +windowRef: + windowName + | windowSpec + +windowSpec: + [ windowName ] + '(' + [ ORDER BY orderItem [, orderItem ]* ] + [ PARTITION BY expression [, expression ]* ] + [ + RANGE numericOrIntervalExpression {PRECEDING} + | ROWS numericExpression {PRECEDING} + ] + ')' ``` @@ -302,6 +318,13 @@ SELECT COUNT(amount) OVER ( ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM Orders + +SELECT COUNT(amount) OVER w, SUM(amount) OVER w +FROM Orders +WINDOW w AS ( + PARTITION BY user + ORDER BY proctime + ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) {% endhighlight %} </td> </tr> http://git-wip-us.apache.org/repos/asf/flink/blob/e952408c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/OverWindowTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/OverWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/OverWindowTest.scala index eea395c..8bdc8bc 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/OverWindowTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/OverWindowTest.scala @@ -41,6 +41,15 @@ class OverWindowTest extends TableTestBase { "CURRENT ROW) as sum1 " + "from MyTable" + val sql2 = "SELECT " + + "c, " + + "count(a) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 preceding AND " + + "CURRENT ROW) as cnt1, " + + "sum(a) OVER w as sum1 " + + "from MyTable " + + "WINDOW w AS (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2 preceding AND CURRENT ROW)" + streamUtil.verifySqlPlansIdentical(sql, sql2) + val expected = unaryNode( "DataStreamCalc", @@ -70,6 +79,14 @@ class OverWindowTest extends TableTestBase { " RANGE BETWEEN INTERVAL '2' HOUR PRECEDING AND CURRENT ROW) AS avgA " + "FROM MyTable" + val sqlQuery2 = + "SELECT a, " + + " AVG(c) OVER w AS avgA " + + "FROM MyTable " + + "WINDOW w AS (PARTITION BY a ORDER BY proctime " + + " RANGE BETWEEN INTERVAL '2' HOUR PRECEDING AND CURRENT ROW)" + streamUtil.verifySqlPlansIdentical(sqlQuery, sqlQuery2) + val expected = unaryNode( "DataStreamCalc", @@ -107,6 +124,14 @@ class OverWindowTest extends TableTestBase { " RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW) " + "FROM MyTable" + val sqlQuery2 = + "SELECT a, " + + " COUNT(c) OVER w " + + "FROM MyTable " + + "WINDOW w AS (ORDER BY proctime " + + " RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW)" + streamUtil.verifySqlPlansIdentical(sqlQuery, sqlQuery2) + val expected = unaryNode( "DataStreamCalc", @@ -135,6 +160,13 @@ class OverWindowTest extends TableTestBase { "CURRENT ROW)" + "from MyTable" + val sql2 = "SELECT " + + "c, " + + "count(a) OVER w " + + "FROM MyTable " + + "WINDOW w AS (ORDER BY proctime ROWS BETWEEN 2 preceding AND CURRENT ROW)" + streamUtil.verifySqlPlansIdentical(sql, sql2) + val expected = unaryNode( "DataStreamCalc", @@ -163,6 +195,14 @@ class OverWindowTest extends TableTestBase { "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " + "from MyTable" + val sql2 = "SELECT " + + "c, " + + "count(a) OVER w as cnt1, " + + "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " + + "FROM MyTable " + + "WINDOW w AS (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding)" + streamUtil.verifySqlPlansIdentical(sql, sql2) + val expected = unaryNode( "DataStreamCalc", @@ -191,6 +231,13 @@ class OverWindowTest extends TableTestBase { "CURRENT ROW) " + "from MyTable" + val sql2 = "SELECT " + + "c, " + + "count(a) OVER w " + + "FROM MyTable " + + "WINDOW w AS (PARTITION BY c ORDER BY proctime ROWS UNBOUNDED preceding)" + streamUtil.verifySqlPlansIdentical(sql, sql2) + val expected = unaryNode( "DataStreamCalc", @@ -215,6 +262,14 @@ class OverWindowTest extends TableTestBase { "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt2 " + "from MyTable" + val sql2 = "SELECT " + + "c, " + + "count(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) as cnt1, " + + "sum(a) OVER w as cnt2 " + + "FROM MyTable " + + "WINDOW w AS(ORDER BY proctime RANGE UNBOUNDED preceding)" + streamUtil.verifySqlPlansIdentical(sql, sql2) + val expected = unaryNode( "DataStreamCalc", @@ -242,6 +297,13 @@ class OverWindowTest extends TableTestBase { "CURRENT ROW) " + "from MyTable" + val sql2 = "SELECT " + + "c, " + + "count(a) OVER w " + + "FROM MyTable " + + "WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" + streamUtil.verifySqlPlansIdentical(sql, sql2) + val expected = unaryNode( "DataStreamCalc", @@ -502,4 +564,43 @@ class OverWindowTest extends TableTestBase { ) streamUtil.verifySql(sql, expected) } + + @Test + def testProcTimeBoundedPartitionedRowsOverDifferentWindows() = { + val sql = "SELECT " + + "a, " + + "SUM(c) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + "MIN(c) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) " + + "FROM MyTable" + + val sql2 = "SELECT " + + "a, " + + "SUM(c) OVER w1, " + + "MIN(c) OVER w2 " + + "FROM MyTable " + + "WINDOW w1 AS (PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW)," + + "w2 AS (PARTITION BY a ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)" + streamUtil.verifySqlPlansIdentical(sql, sql2) + + val expected = + unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamOverAggregate", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "c", "proctime") + ), + term("partitionBy", "a"), + term("orderBy", "proctime"), + term("rows", "BETWEEN 3 PRECEDING AND CURRENT ROW"), + term("select", "a", "c", "proctime", "COUNT(c) AS w0$o0", + "$SUM0(c) AS w0$o1") + ), + term("select", "a", "CASE(>(w0$o0, 0)", "CAST(w0$o1), null) AS EXPR$1", "w1$o0 AS EXPR$2") + ) + + streamUtil.verifySql(sql, expected) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/e952408c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala index 804fad8..35501a7 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala @@ -274,12 +274,25 @@ case class StreamTableTestUtil() extends TableTestUtil { verifyTable(tableEnv.sqlQuery(query), expected) } + def verifySqlPlansIdentical(query1: String, queries: String*): Unit = { + val resultTable1 = tableEnv.sqlQuery(query1) + queries.foreach(s => verify2Tables(resultTable1, tableEnv.sqlQuery(s))) + } + def verifyTable(resultTable: Table, expected: String): Unit = { val relNode = resultTable.getRelNode val optimized = tableEnv.optimize(relNode, updatesAsRetraction = false) verifyString(expected, optimized) } + def verify2Tables(resultTable1: Table, resultTable2: Table): Unit = { + val relNode1 = resultTable1.getRelNode + val optimized1 = tableEnv.optimize(relNode1, updatesAsRetraction = false) + val relNode2 = resultTable2.getRelNode + val optimized2 = tableEnv.optimize(relNode2, updatesAsRetraction = false) + assertEquals(RelOptUtil.toString(optimized1), RelOptUtil.toString(optimized2)) + } + def verifyJavaSql(query: String, expected: String): Unit = { verifyJavaTable(javaTableEnv.sqlQuery(query), expected) }
