This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5fef3c1272e5be6ca20ed83f87c147a2d18379b0 Author: yuzhao.cyz <[email protected]> AuthorDate: Tue Mar 17 21:34:17 2020 +0800 [FLINK-14338][table-planner-blink] Update files due to CALCITE-1824 * GROUP_ID translation was fixed --- .../logical/DecomposeGroupingSetsRuleTest.xml | 61 ++++++++++------ .../plan/stream/sql/agg/GroupingSetsTest.scala | 3 + .../runtime/batch/sql/agg/GroupingSetsITCase.scala | 11 +-- .../runtime/batch/sql/GroupingSetsITCase.java | 83 +++++++++++++--------- .../api/batch/sql/DistinctAggregateTest.scala | 19 +---- .../table/api/batch/sql/GroupingSetsTest.scala | 47 ++++-------- .../table/runtime/batch/sql/AggregateITCase.scala | 31 +++++--- 7 files changed, 132 insertions(+), 123 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRuleTest.xml index ca3284e..d656c98 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRuleTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/DecomposeGroupingSetsRuleTest.xml @@ -24,16 +24,27 @@ SELECT a, GROUP_ID() AS g, COUNT(*) as c FROM MyTable GROUP BY GROUPING SETS (a, </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalAggregate(group=[{0}], groups=[[{0}, {}]], g=[GROUP_ID()], c=[COUNT()]) -+- LogicalProject(a=[$0]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) +LogicalUnion(all=[true]) +:- LogicalProject(a=[$0], g=[0:BIGINT], c=[$1]) +: +- LogicalAggregate(group=[{0}], groups=[[{0}, {}]], c=[COUNT()]) +: +- LogicalProject(a=[$0]) +: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) ++- LogicalProject(a=[$0], g=[1:BIGINT], c=[$1]) + +- LogicalAggregate(group=[{0}], groups=[[{}]], c=[COUNT()]) + +- LogicalProject(a=[$0]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ -FlinkLogicalCalc(select=[a, 0:BIGINT AS g, c]) -+- FlinkLogicalAggregate(group=[{0, 1}], c=[COUNT()]) - +- FlinkLogicalExpand(projects=[{a=[$0], $e=[0]}, {a=[null], $e=[1]}]) +FlinkLogicalUnion(all=[true]) +:- FlinkLogicalCalc(select=[a, 0:BIGINT AS g, c]) +: +- FlinkLogicalAggregate(group=[{0, 1}], c=[COUNT()]) +: +- FlinkLogicalExpand(projects=[a, $e]) +: +- FlinkLogicalCalc(select=[a]) +: +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ++- FlinkLogicalCalc(select=[a, 1:BIGINT AS g, c]) + +- FlinkLogicalAggregate(group=[{0}], groups=[[{}]], c=[COUNT()]) +- FlinkLogicalCalc(select=[a]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> @@ -56,16 +67,17 @@ FROM MyTable </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalAggregate(group=[{0, 1}], groups=[[{0, 1}, {0}, {1}, {}]], a=[AVG($2)], g=[GROUP_ID()], gb=[GROUPING($0)], gc=[GROUPING($1)], gib=[GROUPING_ID($0)], gic=[GROUPING_ID($1)], gid=[GROUPING_ID($0, $1)]) -+- LogicalProject(b=[$1], c=[$2], a=[$0]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) +LogicalProject(b=[$0], c=[$1], a=[$2], g=[0:BIGINT], gb=[$3], gc=[$4], gib=[$5], gic=[$6], gid=[$7]) ++- LogicalAggregate(group=[{0, 1}], groups=[[{0, 1}, {0}, {1}, {}]], a=[AVG($2)], gb=[GROUPING($0)], gc=[GROUPING($1)], gib=[GROUPING_ID($0)], gic=[GROUPING_ID($1)], gid=[GROUPING_ID($0, $1)]) + +- LogicalProject(b=[$1], c=[$2], a=[$0]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ FlinkLogicalCalc(select=[b, c, a, 0:BIGINT AS g, CASE(OR(=($e, 0:BIGINT), =($e, 1:BIGINT)), 0:BIGINT, 1:BIGINT) AS gb, CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 0:BIGINT, 1:BIGINT) AS gc, CASE(OR(=($e, 0:BIGINT), =($e, 1:BIGINT)), 0:BIGINT, 1:BIGINT) AS gib, CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 0:BIGINT, 1:BIGINT) AS gic, CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, =($e, 2:BIGINT), 2:BIGINT, 3:BIGINT) [...] +- FlinkLogicalAggregate(group=[{1, 2, 3}], a=[AVG($0)]) - +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], c=[$2], $e=[0]}, {a=[$0], b=[$1], c=[null], $e=[1]}, {a=[$0], b=[null], c=[$2], $e=[2]}, {a=[$0], b=[null], c=[null], $e=[3]}]) + +- FlinkLogicalExpand(projects=[a, b, c, $e]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -84,9 +96,10 @@ FROM MyTable </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalAggregate(group=[{0}], a=[AVG($1)], g=[GROUP_ID()], gb=[GROUPING($0)], gib=[GROUPING_ID($0)]) -+- LogicalProject(b=[$1], a=[$0]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) +LogicalProject(b=[$0], a=[$1], g=[0:BIGINT], gb=[$2], gib=[$3]) ++- LogicalAggregate(group=[{0}], a=[AVG($1)], gb=[GROUPING($0)], gib=[GROUPING_ID($0)]) + +- LogicalProject(b=[$1], a=[$0]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) ]]> </Resource> <Resource name="planAfter"> @@ -107,16 +120,17 @@ GROUP BY GROUPING SETS (b, c) </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalAggregate(group=[{0, 1}], groups=[[{0}, {1}]], a=[AVG($2)], g=[GROUP_ID()]) -+- LogicalProject(b=[$1], c=[$2], a=[$0]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) +LogicalProject(b=[$0], c=[$1], a=[$2], g=[0:BIGINT]) ++- LogicalAggregate(group=[{0, 1}], groups=[[{0}, {1}]], a=[AVG($2)]) + +- LogicalProject(b=[$1], c=[$2], a=[$0]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ FlinkLogicalCalc(select=[b, c, a, 0:BIGINT AS g]) +- FlinkLogicalAggregate(group=[{1, 2, 3}], a=[AVG($0)]) - +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], c=[null], $e=[1]}, {a=[$0], b=[null], c=[$2], $e=[2]}]) + +- FlinkLogicalExpand(projects=[a, b, c, $e]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -139,7 +153,7 @@ LogicalProject(a=[$3], b=[$4], c=[$5]) <![CDATA[ FlinkLogicalCalc(select=[a_0 AS a, b_0 AS b, c_0 AS c]) +- FlinkLogicalAggregate(group=[{0, 1, 2, 3}], a=[COUNT($0)], b=[COUNT($4)], c=[COUNT($5)]) - +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], c=[null], $e=[1], b_0=[$1], c_0=[$2]}, {a=[$0], b=[null], c=[$2], $e=[2], b_0=[$1], c_0=[$2]}]) + +- FlinkLogicalExpand(projects=[a, b, c, $e, b_0, c_0]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -161,16 +175,17 @@ FROM MyTable </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalAggregate(group=[{0, 1}], groups=[[{0, 1}, {0}, {}]], a=[AVG($2)], g=[GROUP_ID()], gb=[GROUPING($0)], gc=[GROUPING($1)], gib=[GROUPING_ID($0)], gic=[GROUPING_ID($1)], gid=[GROUPING_ID($0, $1)]) -+- LogicalProject(b=[$1], c=[$2], a=[$0]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) +LogicalProject(b=[$0], c=[$1], a=[$2], g=[0:BIGINT], gb=[$3], gc=[$4], gib=[$5], gic=[$6], gid=[$7]) ++- LogicalAggregate(group=[{0, 1}], groups=[[{0, 1}, {0}, {}]], a=[AVG($2)], gb=[GROUPING($0)], gc=[GROUPING($1)], gib=[GROUPING_ID($0)], gic=[GROUPING_ID($1)], gid=[GROUPING_ID($0, $1)]) + +- LogicalProject(b=[$1], c=[$2], a=[$0]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ FlinkLogicalCalc(select=[b, c, a, 0:BIGINT AS g, CASE(OR(=($e, 0:BIGINT), =($e, 1:BIGINT)), 0:BIGINT, 1:BIGINT) AS gb, CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT) AS gc, CASE(OR(=($e, 0:BIGINT), =($e, 1:BIGINT)), 0:BIGINT, 1:BIGINT) AS gib, CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT) AS gic, CASE(=($e, 0:BIGINT), 0:BIGINT, =($e, 1:BIGINT), 1:BIGINT, 3:BIGINT) AS gid]) +- FlinkLogicalAggregate(group=[{1, 2, 3}], a=[AVG($0)]) - +- FlinkLogicalExpand(projects=[{a=[$0], b=[$1], c=[$2], $e=[0]}, {a=[$0], b=[$1], c=[null], $e=[1]}, {a=[$0], b=[null], c=[null], $e=[3]}]) + +- FlinkLogicalExpand(projects=[a, b, c, $e]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> </Resource> @@ -193,7 +208,7 @@ LogicalProject(b=[$2], c=[$3]) <![CDATA[ FlinkLogicalCalc(select=[b_0 AS b, c_0 AS c]) +- FlinkLogicalAggregate(group=[{0, 1, 2}], b=[COUNT($3)], c=[COUNT($4)]) - +- FlinkLogicalExpand(projects=[{b=[$0], c=[null], $e=[1], b_0=[$0], c_0=[$1]}, {b=[null], c=[$1], $e=[2], b_0=[$0], c_0=[$1]}]) + +- FlinkLogicalExpand(projects=[b, c, $e, b_0, c_0]) +- FlinkLogicalCalc(select=[b, c]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) ]]> diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupingSetsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupingSetsTest.scala index b371e81..652b8a0 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupingSetsTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/GroupingSetsTest.scala @@ -19,6 +19,7 @@ package org.apache.flink.table.planner.plan.stream.sql.agg import org.apache.flink.api.scala._ +import org.apache.flink.table.api.TableException import org.apache.flink.table.api.scala._ import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil import org.apache.flink.table.planner.utils.{TableTestBase, TableTestUtil} @@ -354,6 +355,8 @@ class GroupingSetsTest extends TableTestBase { @Test def testCALCITE1824(): Unit = { + expectedException.expect(classOf[TableException]) + expectedException.expectMessage("GROUPING SETS are currently not supported") val sqlQuery = """ |SELECT deptno, GROUP_ID() AS g, COUNT(*) AS c diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/GroupingSetsITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/GroupingSetsITCase.scala index 584d073..a9c1be6 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/GroupingSetsITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/agg/GroupingSetsITCase.scala @@ -395,13 +395,16 @@ class GroupingSetsITCase extends BatchTestBase { @Test def testCALCITE1824(): Unit = { - // TODO: - // When "[CALCITE-1824] GROUP_ID returns wrong result" is fixed, - // there will be an extra row (null, 1, 14). checkResult( "select deptno, group_id() as g, count(*) as c " + "from scott_emp group by grouping sets (deptno, (), ())", - Seq(row(10, 0, 3), row(20, 0, 5), row(30, 0, 6), row(null, 0, 14)) + Seq(row(10, 0, 3), + row(10, 1, 3), + row(20, 0, 5), + row(20, 1, 5), + row(30, 0, 6), + row(30, 1, 6), + row(null, 0, 14)) ) } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java index a9fd29b..c73eaff 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java @@ -89,33 +89,33 @@ public class GroupingSetsITCase extends TableProgramsClusterTestBase { " GROUP BY GROUPING SETS (f1, f2, ())"; String expected = - "1,null,1,1,1,0,1,0,2,1\n" + - "6,null,18,1,1,0,1,0,2,6\n" + - "2,null,2,1,1,0,1,0,2,2\n" + - "4,null,8,1,1,0,1,0,2,4\n" + - "5,null,13,1,1,0,1,0,2,5\n" + - "3,null,5,1,1,0,1,0,2,3\n" + - "null,Comment#11,17,2,0,1,0,1,1,1\n" + - "null,Comment#8,14,2,0,1,0,1,1,1\n" + - "null,Comment#2,8,2,0,1,0,1,1,1\n" + - "null,Comment#1,7,2,0,1,0,1,1,1\n" + - "null,Comment#14,20,2,0,1,0,1,1,1\n" + - "null,Comment#7,13,2,0,1,0,1,1,1\n" + - "null,Comment#6,12,2,0,1,0,1,1,1\n" + - "null,Comment#3,9,2,0,1,0,1,1,1\n" + - "null,Comment#12,18,2,0,1,0,1,1,1\n" + - "null,Comment#5,11,2,0,1,0,1,1,1\n" + - "null,Comment#15,21,2,0,1,0,1,1,1\n" + - "null,Comment#4,10,2,0,1,0,1,1,1\n" + - "null,Hi,1,2,0,1,0,1,1,1\n" + - "null,Comment#10,16,2,0,1,0,1,1,1\n" + - "null,Hello world,3,2,0,1,0,1,1,1\n" + - "null,I am fine.,5,2,0,1,0,1,1,1\n" + - "null,Hello world, how are you?,4,2,0,1,0,1,1,1\n" + - "null,Comment#9,15,2,0,1,0,1,1,1\n" + - "null,Comment#13,19,2,0,1,0,1,1,1\n" + - "null,Luke Skywalker,6,2,0,1,0,1,1,1\n" + - "null,Hello,2,2,0,1,0,1,1,1\n" + + "1,null,1,0,1,0,1,0,2,1\n" + + "2,null,2,0,1,0,1,0,2,2\n" + + "3,null,5,0,1,0,1,0,2,3\n" + + "4,null,8,0,1,0,1,0,2,4\n" + + "5,null,13,0,1,0,1,0,2,5\n" + + "6,null,18,0,1,0,1,0,2,6\n" + + "null,Comment#1,7,0,0,1,0,1,1,1\n" + + "null,Comment#10,16,0,0,1,0,1,1,1\n" + + "null,Comment#11,17,0,0,1,0,1,1,1\n" + + "null,Comment#12,18,0,0,1,0,1,1,1\n" + + "null,Comment#13,19,0,0,1,0,1,1,1\n" + + "null,Comment#14,20,0,0,1,0,1,1,1\n" + + "null,Comment#15,21,0,0,1,0,1,1,1\n" + + "null,Comment#2,8,0,0,1,0,1,1,1\n" + + "null,Comment#3,9,0,0,1,0,1,1,1\n" + + "null,Comment#4,10,0,0,1,0,1,1,1\n" + + "null,Comment#5,11,0,0,1,0,1,1,1\n" + + "null,Comment#6,12,0,0,1,0,1,1,1\n" + + "null,Comment#7,13,0,0,1,0,1,1,1\n" + + "null,Comment#8,14,0,0,1,0,1,1,1\n" + + "null,Comment#9,15,0,0,1,0,1,1,1\n" + + "null,Hello world, how are you?,4,0,0,1,0,1,1,1\n" + + "null,Hello world,3,0,0,1,0,1,1,1\n" + + "null,Hello,2,0,0,1,0,1,1,1\n" + + "null,Hi,1,0,0,1,0,1,1,1\n" + + "null,I am fine.,5,0,0,1,0,1,1,1\n" + + "null,Luke Skywalker,6,0,0,1,0,1,1,1\n" + "null,null,11,0,0,0,0,0,0,21"; checkSql(query, expected); @@ -128,14 +128,27 @@ public class GroupingSetsITCase extends TableProgramsClusterTestBase { " GROUP BY GROUPING SETS (f1, f2)"; String expected = - "6,null,18,1\n5,null,13,1\n4,null,8,1\n3,null,5,1\n2,null,2,1\n1,null,1,1\n" + - "null,Luke Skywalker,6,2\nnull,I am fine.,5,2\nnull,Hi,1,2\n" + - "null,null,3,2\nnull,Hello,2,2\nnull,Comment#9,15,2\nnull,Comment#8,14,2\n" + - "null,Comment#7,13,2\nnull,Comment#6,12,2\nnull,Comment#5,11,2\n" + - "null,Comment#4,10,2\nnull,Comment#3,9,2\nnull,Comment#2,8,2\n" + - "null,Comment#15,21,2\nnull,Comment#14,20,2\nnull,Comment#13,19,2\n" + - "null,Comment#12,18,2\nnull,Comment#11,17,2\nnull,Comment#10,16,2\n" + - "null,Comment#1,7,2"; + "1,Hi,1,0\n" + + "2,Hello,2,0\n" + + "2,null,3,0\n" + + "3,I am fine.,5,0\n" + + "3,Luke Skywalker,6,0\n" + + "3,null,4,0\n" + + "4,Comment#1,7,0\n" + + "4,Comment#2,8,0\n" + + "4,Comment#3,9,0\n" + + "4,Comment#4,10,0\n" + + "5,Comment#5,11,0\n" + + "5,Comment#6,12,0\n" + + "5,Comment#7,13,0\n" + + "5,Comment#8,14,0\n" + + "5,Comment#9,15,0\n" + + "6,Comment#10,16,0\n" + + "6,Comment#11,17,0\n" + + "6,Comment#12,18,0\n" + + "6,Comment#13,19,0\n" + + "6,Comment#14,20,0\n" + + "6,Comment#15,21,0"; checkSql(query, expected); } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/DistinctAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/DistinctAggregateTest.scala index d3c2795..ff5e560 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/DistinctAggregateTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/DistinctAggregateTest.scala @@ -57,13 +57,7 @@ class DistinctAggregateTest extends TableTestBase { val sqlQuery = "SELECT COUNT(DISTINCT a), SUM(DISTINCT a), MAX(DISTINCT a) FROM MyTable" - val left = unaryNode("DataSetAggregate", - unaryNode("DataSetCalc", - batchTableNode(table), - term("select", "a")), - term("select", "MAX(a) AS EXPR$2")) - - val right = unaryNode( + val expected = unaryNode( "DataSetAggregate", unaryNode( "DataSetDistinct", @@ -74,18 +68,9 @@ class DistinctAggregateTest extends TableTestBase { ), term("distinct", "a") ), - term("select", "COUNT(a) AS EXPR$0", "SUM(a) AS EXPR$1") + term("select", "COUNT(a) AS EXPR$0", "SUM(a) AS EXPR$1", "MAX(a) AS EXPR$2") ) - val expected = unaryNode("DataSetCalc", - binaryNode("DataSetSingleRowJoin", - left, - right, - term("where", "true"), - term("join", "EXPR$2", "EXPR$0", "EXPR$1"), - term("joinType", "NestedLoopInnerJoin")), - term("select", "EXPR$0", "EXPR$1", "EXPR$2")) - util.verifySql(sqlQuery, expected) } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/GroupingSetsTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/GroupingSetsTest.scala index 7b811aa..59faa2c 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/GroupingSetsTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/batch/sql/GroupingSetsTest.scala @@ -34,38 +34,15 @@ class GroupingSetsTest extends TableTestBase { val sqlQuery = "SELECT b, c, avg(a) as a, GROUP_ID() as g FROM MyTable " + "GROUP BY GROUPING SETS (b, c)" - val aggregate = binaryNode( - "DataSetUnion", - unaryNode( - "DataSetCalc", - unaryNode( - "DataSetAggregate", - unaryNode( - "DataSetCalc", - batchTableNode(table), - term("select", "b", "a") - ), - term("groupBy", "b"), - term("select", "b", "AVG(a) AS a") - ), - term("select", "b", "null:INTEGER AS c", "a", "1:BIGINT AS g") - ), + val aggregate = unaryNode( + "DataSetCalc", unaryNode( - "DataSetCalc", - unaryNode( - "DataSetAggregate", - unaryNode( - "DataSetCalc", - batchTableNode(table), - term("select", "c", "a") - ), - term("groupBy", "c"), - term("select", "c", "AVG(a) AS a") - ), - term("select", "null:BIGINT AS b", "c", "a", "2:BIGINT AS g") + "DataSetAggregate", + batchTableNode(table), + term("groupBy", "b", "c"), + term("select", "b", "c", "AVG(a) AS a") ), - term("all", "true"), - term("union", "b", "c", "a", "g") + term("select", "b", "c", "a", "0:BIGINT AS g") ) util.verifySql(sqlQuery, aggregate) @@ -91,7 +68,7 @@ class GroupingSetsTest extends TableTestBase { term("groupBy", "b", "c"), term("select", "b", "c", "AVG(a) AS a") ), - term("select", "b", "c", "a", "3:BIGINT AS g", "1:BIGINT AS gb", "1:BIGINT AS gc", + term("select", "b", "c", "a", "0:BIGINT AS g", "1:BIGINT AS gb", "1:BIGINT AS gc", "1:BIGINT AS gib", "1:BIGINT AS gic", "3:BIGINT AS gid") ) @@ -107,7 +84,7 @@ class GroupingSetsTest extends TableTestBase { term("groupBy", "b"), term("select", "b", "AVG(a) AS a") ), - term("select", "b", "null:INTEGER AS c", "a", "1:BIGINT AS g", "1:BIGINT AS gb", + term("select", "b", "null:INTEGER AS c", "a", "0:BIGINT AS g", "1:BIGINT AS gb", "0:BIGINT AS gc", "1:BIGINT AS gib", "0:BIGINT AS gic", "2:BIGINT AS gid") ) @@ -123,7 +100,7 @@ class GroupingSetsTest extends TableTestBase { term("groupBy", "c"), term("select", "c", "AVG(a) AS a") ), - term("select", "null:BIGINT AS b", "c", "a", "2:BIGINT AS g", "0:BIGINT AS gb", + term("select", "null:BIGINT AS b", "c", "a", "0:BIGINT AS g", "0:BIGINT AS gb", "1:BIGINT AS gc", "0:BIGINT AS gib", "1:BIGINT AS gic", "1:BIGINT AS gid") ) @@ -185,7 +162,7 @@ class GroupingSetsTest extends TableTestBase { term("groupBy", "b", "c"), term("select", "b", "c", "AVG(a) AS a") ), - term("select", "b", "c", "a", "3:BIGINT AS g", "1:BIGINT AS gb", "1:BIGINT AS gc", + term("select", "b", "c", "a", "0:BIGINT AS g", "1:BIGINT AS gb", "1:BIGINT AS gc", "1:BIGINT AS gib", "1:BIGINT AS gic", "3:BIGINT AS gid") ) @@ -201,7 +178,7 @@ class GroupingSetsTest extends TableTestBase { term("groupBy", "b"), term("select", "b", "AVG(a) AS a") ), - term("select", "b", "null:INTEGER AS c", "a", "1:BIGINT AS g", "1:BIGINT AS gb", + term("select", "b", "null:INTEGER AS c", "a", "0:BIGINT AS g", "1:BIGINT AS gb", "0:BIGINT AS gc", "1:BIGINT AS gib", "0:BIGINT AS gic", "2:BIGINT AS gid") ) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala index d2bc5aa..f7d0102 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/sql/AggregateITCase.scala @@ -29,6 +29,7 @@ import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMod import org.apache.flink.table.utils.NonMergableCount import org.apache.flink.test.util.TestBaseUtils import org.apache.flink.types.Row + import org.junit._ import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -232,7 +233,6 @@ class AggregateITCase( @Test def testGroupingSetAggregate(): Unit = { - val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = BatchTableEnvironment.create(env, config) @@ -245,14 +245,27 @@ class AggregateITCase( val result = tEnv.sqlQuery(sqlQuery).toDataSet[Row].collect() val expected = - "6,null,18,1\n5,null,13,1\n4,null,8,1\n3,null,5,1\n2,null,2,1\n1,null,1,1\n" + - "null,Luke Skywalker,6,2\nnull,I am fine.,5,2\nnull,Hi,1,2\n" + - "null,Hello world, how are you?,4,2\nnull,Hello world,3,2\nnull,Hello,2,2\n" + - "null,Comment#9,15,2\nnull,Comment#8,14,2\nnull,Comment#7,13,2\n" + - "null,Comment#6,12,2\nnull,Comment#5,11,2\nnull,Comment#4,10,2\n" + - "null,Comment#3,9,2\nnull,Comment#2,8,2\nnull,Comment#15,21,2\n" + - "null,Comment#14,20,2\nnull,Comment#13,19,2\nnull,Comment#12,18,2\n" + - "null,Comment#11,17,2\nnull,Comment#10,16,2\nnull,Comment#1,7,2" + "1,Hi,1,0\n" + + "2,Hello world,3,0\n" + + "2,Hello,2,0\n" + + "3,Hello world, how are you?,4,0\n" + + "3,I am fine.,5,0\n" + + "3,Luke Skywalker,6,0\n" + + "4,Comment#1,7,0\n" + + "4,Comment#2,8,0\n" + + "4,Comment#3,9,0\n" + + "4,Comment#4,10,0\n" + + "5,Comment#5,11,0\n" + + "5,Comment#6,12,0\n" + + "5,Comment#7,13,0\n" + + "5,Comment#8,14,0\n" + + "5,Comment#9,15,0\n" + + "6,Comment#10,16,0\n" + + "6,Comment#11,17,0\n" + + "6,Comment#12,18,0\n" + + "6,Comment#13,19,0\n" + + "6,Comment#14,20,0\n" + + "6,Comment#15,21,0" TestBaseUtils.compareResultAsText(result.asJava, expected) }
