This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 333885a5014bdd09bfe8ea6decfcd2c785c7603c Author: yuzhao.cyz <[email protected]> AuthorDate: Tue Mar 17 21:22:30 2020 +0800 [FLINK-14338][table-planner][table-planner-blink] Update files due to builtin TUMBLE operator name changes to $Tumble * This change was introduced in CALCITE-3382 --- .../functions/sql/FlinkSqlOperatorTable.java | 3 +- .../apache/flink/table/api/stream/ExplainTest.xml | 4 +- .../planner/plan/batch/sql/DagOptimizationTest.xml | 4 +- .../batch/sql/agg/AggregateReduceGroupingTest.xml | 38 +- .../plan/batch/sql/agg/WindowAggregateTest.xml | 452 ++++++++++----------- .../logical/AggregateReduceGroupingRuleTest.xml | 42 +- .../plan/stream/sql/MiniBatchIntervalInferTest.xml | 16 +- .../stream/sql/RelTimeIndicatorConverterTest.xml | 12 +- .../planner/plan/stream/sql/TableSourceTest.xml | 2 +- .../plan/stream/sql/agg/WindowAggregateTest.xml | 216 ++++------ .../flink/table/catalog/BasicOperatorTable.scala | 2 + 11 files changed, 376 insertions(+), 415 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java index 6cae5de..7ce4bd4 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java @@ -776,7 +776,8 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable { * We need custom group auxiliary functions in order to support nested windows. */ public static final SqlGroupedWindowFunction TUMBLE = new SqlGroupedWindowFunction( - SqlKind.TUMBLE, null, + // The TUMBLE group function was hard code to $TUMBLE in CALCITE-3382. + "$TUMBLE", SqlKind.TUMBLE, null, OperandTypes.or(OperandTypes.DATETIME_INTERVAL, OperandTypes.DATETIME_INTERVAL_TIME)) { @Override public List<SqlGroupedWindowFunction> getAuxiliaryFunctions() { diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml index b90451f..7bdd8b8 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml @@ -638,7 +638,7 @@ SortLimit(orderBy=[a ASC], offset=[0], fetch=[5], updateAsRetraction=[false], ac LogicalSink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a, b]) +- LogicalProject(id1=[$0], EXPR$1=[$2]) +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)]) - +- LogicalProject(id1=[$0], $f1=[TUMBLE($2, 8000:INTERVAL SECOND)], text=[$1], $f3=[_UTF-16LE'#']) + +- LogicalProject(id1=[$0], $f1=[$TUMBLE($2, 8000:INTERVAL SECOND)], text=[$1], $f3=[_UTF-16LE'#']) +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))]) +- LogicalJoin(condition=[true], joinType=[inner]) :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($2, 0:INTERVAL MILLISECOND)]) @@ -778,7 +778,7 @@ Union(all=[true], union=[a, b, c]) LogicalSink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a, b]) +- LogicalProject(id1=[$0], EXPR$1=[$2]) +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)]) - +- LogicalProject(id1=[$0], $f1=[TUMBLE($2, 8000:INTERVAL SECOND)], text=[$1], $f3=[_UTF-16LE'#']) + +- LogicalProject(id1=[$0], $f1=[$TUMBLE($2, 8000:INTERVAL SECOND)], text=[$1], $f3=[_UTF-16LE'#']) +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))]) +- LogicalJoin(condition=[true], joinType=[inner]) :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($2, 0:INTERVAL MILLISECOND)]) diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.xml index 7fe88c8..f48686c 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.xml @@ -570,13 +570,13 @@ Sink(name=[`default_catalog`.`default_database`.`sink2`], fields=[total_min]) LogicalSink(name=[`default_catalog`.`default_database`.`sink1`], fields=[a, sum_c, time, window_start, window_end]) +- LogicalProject(a=[$1], sum_c=[$2], time=[TUMBLE_END($0)], window_start=[TUMBLE_START($0)], window_end=[TUMBLE_END($0)]) +- LogicalAggregate(group=[{0, 1}], sum_c=[SUM($2)]) - +- LogicalProject($f0=[TUMBLE($3, 15000:INTERVAL SECOND)], a=[$0], $f2=[CAST($2):DOUBLE]) + +- LogicalProject($f0=[$TUMBLE($3, 15000:INTERVAL SECOND)], a=[$0], $f2=[CAST($2):DOUBLE]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, ts)]]]) LogicalSink(name=[`default_catalog`.`default_database`.`sink2`], fields=[a, sum_c, time]) +- LogicalProject(a=[$1], sum_c=[$2], time=[TUMBLE_END($0)]) +- LogicalAggregate(group=[{0, 1}], sum_c=[SUM($2)]) - +- LogicalProject($f0=[TUMBLE($3, 15000:INTERVAL SECOND)], a=[$0], $f2=[CAST($2):DOUBLE]) + +- LogicalProject($f0=[$TUMBLE($3, 15000:INTERVAL SECOND)], a=[$0], $f2=[CAST($2):DOUBLE]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, ts)]]]) ]]> </Resource> diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml index aa9e8cc..3dab6fa 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml @@ -359,7 +359,7 @@ HashAggregate(isMerge=[true], groupBy=[a3, b3, a1], auxGrouping=[b1], select=[a3 <![CDATA[ LogicalProject(a4=[$0], b4=[$1], EXPR$2=[$3]) +- LogicalAggregate(group=[{0, 1, 2}], EXPR$2=[COUNT($3)]) - +- LogicalProject(a4=[$0], b4=[$1], $f2=[TUMBLE($3, 900000:INTERVAL MINUTE)], c4=[$2]) + +- LogicalProject(a4=[$0], b4=[$1], $f2=[$TUMBLE($3, 900000:INTERVAL MINUTE)], c4=[$2]) +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]]) ]]> </Resource> @@ -379,7 +379,7 @@ HashWindowAggregate(groupBy=[a4], auxGrouping=[b4], window=[TumblingGroupWindow( <![CDATA[ LogicalProject(a4=[$0], c4=[$1], EXPR$2=[$3], EXPR$3=[$4]) +- LogicalAggregate(group=[{0, 1, 2}], EXPR$2=[COUNT($3)], EXPR$3=[AVG($3)]) - +- LogicalProject(a4=[$0], c4=[$2], $f2=[TUMBLE($3, 900000:INTERVAL MINUTE)], b4=[$1]) + +- LogicalProject(a4=[$0], c4=[$2], $f2=[$TUMBLE($3, 900000:INTERVAL MINUTE)], b4=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]]) ]]> </Resource> @@ -400,7 +400,7 @@ HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], window=[TumblingGroupWindow( LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT($3)]) +- LogicalProject(a4=[$0], c4=[$1], s=[TUMBLE_START($2)], b4=[$3]) +- LogicalAggregate(group=[{0, 1, 2}], b4=[VAR_POP($3)]) - +- LogicalProject(a4=[$0], c4=[$2], $f2=[TUMBLE($3, 900000:INTERVAL MINUTE)], b4=[$1]) + +- LogicalProject(a4=[$0], c4=[$2], $f2=[$TUMBLE($3, 900000:INTERVAL MINUTE)], b4=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]]) ]]> </Resource> @@ -427,7 +427,7 @@ Calc(select=[a4, c4, s, EXPR$3]) LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT($3)]) +- LogicalProject(a4=[$0], c4=[$1], e=[TUMBLE_END($2)], b4=[$3]) +- LogicalAggregate(group=[{0, 1, 2}], b4=[VAR_POP($3)]) - +- LogicalProject(a4=[$0], c4=[$2], $f2=[TUMBLE($3, 900000:INTERVAL MINUTE)], b4=[$1]) + +- LogicalProject(a4=[$0], c4=[$2], $f2=[$TUMBLE($3, 900000:INTERVAL MINUTE)], b4=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]]) ]]> </Resource> @@ -454,7 +454,7 @@ Calc(select=[a4, c4, e, EXPR$3]) LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()]) +- LogicalProject(a4=[$0], b4=[$3], c4=[$1]) +- LogicalAggregate(group=[{0, 1, 2}], b4=[VAR_POP($3)]) - +- LogicalProject(a4=[$0], c4=[$2], $f2=[TUMBLE($3, 900000:INTERVAL MINUTE)], b4=[$1]) + +- LogicalProject(a4=[$0], c4=[$2], $f2=[$TUMBLE($3, 900000:INTERVAL MINUTE)], b4=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]]) ]]> </Resource> @@ -487,7 +487,7 @@ Calc(select=[a1, b1, c1, EXPR$3]) +- HashAggregate(isMerge=[true], groupBy=[a1, b1, c1, $e], select=[a1, b1, c1, $e, Final_COUNT(count$0) AS EXPR$3]) +- Exchange(distribution=[hash[a1, b1, c1, $e]]) +- LocalHashAggregate(groupBy=[a1, b1, c1, $e], select=[a1, b1, c1, $e, Partial_COUNT(d1) AS count$0]) - +- Expand(projects=[{a1=[$0], b1=[$1], c1=[$2], d1=[$3], $e=[0]}, {a1=[$0], b1=[$1], c1=[null], d1=[$3], $e=[1]}, {a1=[$0], b1=[null], c1=[$2], d1=[$3], $e=[2]}, {a1=[$0], b1=[null], c1=[null], d1=[$3], $e=[3]}, {a1=[null], b1=[$1], c1=[$2], d1=[$3], $e=[4]}, {a1=[null], b1=[$1], c1=[null], d1=[$3], $e=[5]}, {a1=[null], b1=[null], c1=[$2], d1=[$3], $e=[6]}, {a1=[null], b1=[null], c1=[null], d1=[$3], $e=[7]}], projects=[{a1, b1, c1, d1, 0 AS $e}, {a1, b1, null AS c1, d1, 1 AS $e} [...] + +- Expand(projects=[a1, b1, c1, d1, $e], projects=[{a1, b1, c1, d1, 0 AS $e}, {a1, b1, null AS c1, d1, 1 AS $e}, {a1, null AS b1, c1, d1, 2 AS $e}, {a1, null AS b1, null AS c1, d1, 3 AS $e}, {null AS a1, b1, c1, d1, 4 AS $e}, {null AS a1, b1, null AS c1, d1, 5 AS $e}, {null AS a1, null AS b1, c1, d1, 6 AS $e}, {null AS a1, null AS b1, null AS c1, d1, 7 AS $e}]) +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1]) ]]> </Resource> @@ -507,7 +507,7 @@ LogicalAggregate(group=[{0, 1, 2}], groups=[[{0, 1}, {0, 2}]], EXPR$3=[COUNT($3) Calc(select=[a1, b1, c1, EXPR$3]) +- HashAggregate(isMerge=[false], groupBy=[a1, b1, c1, $e], select=[a1, b1, c1, $e, COUNT(d1) AS EXPR$3]) +- Exchange(distribution=[hash[a1, b1, c1, $e]]) - +- Expand(projects=[{a1=[$0], b1=[$1], c1=[null], d1=[$3], $e=[1]}, {a1=[$0], b1=[null], c1=[$2], d1=[$3], $e=[2]}], projects=[{a1, b1, null AS c1, d1, 1 AS $e}, {a1, null AS b1, c1, d1, 2 AS $e}]) + +- Expand(projects=[a1, b1, c1, d1, $e], projects=[{a1, b1, null AS c1, d1, 1 AS $e}, {a1, null AS b1, c1, d1, 2 AS $e}]) +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1]) ]]> </Resource> @@ -530,7 +530,7 @@ Calc(select=[a1, s]) +- HashAggregate(isMerge=[true], groupBy=[a1, c1, $e], select=[a1, c1, $e, Final_SUM(sum$0) AS s]) +- Exchange(distribution=[hash[a1, c1, $e]]) +- LocalHashAggregate(groupBy=[a1, c1, $e], select=[a1, c1, $e, Partial_SUM(b1) AS sum$0]) - +- Expand(projects=[{a1=[$0], c1=[$1], b1=[$2], $e=[0]}, {a1=[$0], c1=[null], b1=[$2], $e=[1]}, {a1=[null], c1=[null], b1=[$2], $e=[3]}], projects=[{a1, c1, b1, 0 AS $e}, {a1, null AS c1, b1, 1 AS $e}, {null AS a1, null AS c1, b1, 3 AS $e}]) + +- Expand(projects=[a1, c1, b1, $e], projects=[{a1, c1, b1, 0 AS $e}, {a1, null AS c1, b1, 1 AS $e}, {null AS a1, null AS c1, b1, 3 AS $e}]) +- Calc(select=[a1, c1, b1]) +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1]) ]]> @@ -552,7 +552,7 @@ LogicalProject(a1=[$0], b1=[$1], c1=[$2], EXPR$3=[$4]) Calc(select=[a1, b1, c1, EXPR$3]) +- HashAggregate(isMerge=[false], groupBy=[a1, c1, d1, $e], auxGrouping=[b1], select=[a1, c1, d1, $e, b1, COUNT(d1_0) AS EXPR$3]) +- Exchange(distribution=[hash[a1, c1, d1, $e]]) - +- Expand(projects=[{a1=[$0], b1=[$1], c1=[$2], d1=[null], $e=[1], d1_0=[$3]}, {a1=[$0], b1=[$1], c1=[null], d1=[$3], $e=[2], d1_0=[$3]}], projects=[{a1, b1, c1, null AS d1, 1 AS $e, d1 AS d1_0}, {a1, b1, null AS c1, d1, 2 AS $e, d1 AS d1_0}]) + +- Expand(projects=[a1, b1, c1, d1, $e, d1_0], projects=[{a1, b1, c1, null AS d1, 1 AS $e, d1 AS d1_0}, {a1, b1, null AS c1, d1, 2 AS $e, d1 AS d1_0}]) +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1]) ]]> </Resource> @@ -614,7 +614,7 @@ Calc(select=[a1, b1, c1, EXPR$3]) +- HashAggregate(isMerge=[true], groupBy=[a1, b1, c1, $e], select=[a1, b1, c1, $e, Final_COUNT(count$0) AS EXPR$3]) +- Exchange(distribution=[hash[a1, b1, c1, $e]]) +- LocalHashAggregate(groupBy=[a1, b1, c1, $e], select=[a1, b1, c1, $e, Partial_COUNT(d1) AS count$0]) - +- Expand(projects=[{a1=[$0], b1=[$1], c1=[$2], d1=[$3], $e=[0]}, {a1=[$0], b1=[$1], c1=[null], d1=[$3], $e=[1]}, {a1=[$0], b1=[null], c1=[null], d1=[$3], $e=[3]}, {a1=[null], b1=[null], c1=[null], d1=[$3], $e=[7]}], projects=[{a1, b1, c1, d1, 0 AS $e}, {a1, b1, null AS c1, d1, 1 AS $e}, {a1, null AS b1, null AS c1, d1, 3 AS $e}, {null AS a1, null AS b1, null AS c1, d1, 7 AS $e}]) + +- Expand(projects=[a1, b1, c1, d1, $e], projects=[{a1, b1, c1, d1, 0 AS $e}, {a1, b1, null AS c1, d1, 1 AS $e}, {a1, null AS b1, null AS c1, d1, 3 AS $e}, {null AS a1, null AS b1, null AS c1, d1, 7 AS $e}]) +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1]) ]]> </Resource> @@ -697,7 +697,7 @@ LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT(DISTINCT $2)], EXPR$3=[SUM(DISTIN HashAggregate(isMerge=[false], groupBy=[a1], auxGrouping=[d1], select=[a1, d1, COUNT(c1) FILTER $g_4 AS EXPR$2, SUM(b1) FILTER $g_2 AS EXPR$3]) +- Exchange(distribution=[hash[a1]]) +- Calc(select=[a1, b1, c1, d1, =(CASE(=($e, 2:BIGINT), 2:BIGINT, 4:BIGINT), 2) AS $g_2, =(CASE(=($e, 2:BIGINT), 2:BIGINT, 4:BIGINT), 4) AS $g_4]) - +- Expand(projects=[{a1=[$0], b1=[$1], c1=[null], d1=[$3], $e=[2]}, {a1=[$0], b1=[null], c1=[$2], d1=[$3], $e=[4]}], projects=[{a1, b1, null AS c1, d1, 2 AS $e}, {a1, null AS b1, c1, d1, 4 AS $e}]) + +- Expand(projects=[a1, b1, c1, d1, $e], projects=[{a1, b1, null AS c1, d1, 2 AS $e}, {a1, null AS b1, c1, d1, 4 AS $e}]) +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1]) ]]> </Resource> @@ -715,7 +715,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[SUM(DISTINCT $1)], EXPR$2=[MAX($1)], EXPR$ </Resource> <Resource name="planAfter"> <![CDATA[ -Calc(select=[a1, b1, b1 AS b10, c1]) +Calc(select=[a1, b1 AS EXPR$1, b1 AS EXPR$2, c1 AS EXPR$3]) +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1]) ]]> </Resource> @@ -737,7 +737,7 @@ SortAggregate(isMerge=[false], groupBy=[a1], select=[a1, MIN(d1) FILTER $g_1 AS +- Calc(select=[a1, c1, d1, b1 AS EXPR$3, b1 AS EXPR$4, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 1) AS $g_1]) +- Sort(orderBy=[a1 ASC]) +- Exchange(distribution=[hash[a1]]) - +- Expand(projects=[{a1=[$0], b1=[$1], c1=[$2], d1=[$3], $e=[0]}, {a1=[$0], b1=[$1], c1=[null], d1=[$3], $e=[1]}], projects=[{a1, b1, c1, d1, 0 AS $e}, {a1, b1, null AS c1, d1, 1 AS $e}]) + +- Expand(projects=[a1, b1, c1, d1, $e], projects=[{a1, b1, c1, d1, 0 AS $e}, {a1, b1, null AS c1, d1, 1 AS $e}]) +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1]) ]]> </Resource> @@ -760,7 +760,7 @@ Calc(select=[EXPR$0]) +- HashAggregate(isMerge=[true], groupBy=[$f0], select=[$f0, Final_COUNT(count$0) AS EXPR$0]) +- Exchange(distribution=[hash[$f0]]) +- LocalHashAggregate(groupBy=[$f0], select=[$f0, Partial_COUNT(c1) AS count$0]) - +- Calc(select=[1 AS $f0, true AS $f1, c1]) + +- Calc(select=[1 AS $f0, c1]) +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1]) ]]> </Resource> @@ -841,7 +841,7 @@ LogicalProject(a1=[$0], b1=[$1], EXPR$2=[$4]) <![CDATA[ HashAggregate(isMerge=[false], groupBy=[a1], auxGrouping=[b1], select=[a1, b1, COUNT(c1) AS EXPR$2]) +- Exchange(distribution=[hash[a1]]) - +- Calc(select=[a1, b1, 1 AS $f2, true AS $f3, c1]) + +- Calc(select=[a1, b1, c1]) +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1]) ]]> </Resource> @@ -902,7 +902,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[SUM($1)]) HashAggregate(isMerge=[false], groupBy=[a1], select=[a1, COUNT(b1) FILTER $g_0 AS EXPR$1, MIN(EXPR$2) FILTER $g_1 AS EXPR$2]) +- Exchange(distribution=[hash[a1]]) +- Calc(select=[a1, b1, b1_0 AS EXPR$2, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 1) AS $g_1]) - +- Expand(projects=[{a1=[$0], b1=[$1], c1=[$2], d1=[$3], $e=[0], b1_0=[$1]}, {a1=[$0], b1=[null], c1=[$2], d1=[$3], $e=[1], b1_0=[$1]}], projects=[{a1, b1, c1, d1, 0 AS $e, b1 AS b1_0}, {a1, null AS b1, c1, d1, 1 AS $e, b1 AS b1_0}]) + +- Expand(projects=[a1, b1, c1, d1, $e, b1_0], projects=[{a1, b1, c1, d1, 0 AS $e, b1 AS b1_0}, {a1, null AS b1, c1, d1, 1 AS $e, b1 AS b1_0}]) +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1]) ]]> </Resource> @@ -924,7 +924,7 @@ SortAggregate(isMerge=[false], groupBy=[a1], select=[a1, MIN(c1) FILTER $g_1 AS +- Sort(orderBy=[a1 ASC]) +- Exchange(distribution=[hash[a1]]) +- Calc(select=[a1, b1, c1, b1_0 AS EXPR$3, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 1) AS $g_1]) - +- Expand(projects=[{a1=[$0], b1=[$1], c1=[$2], d1=[$3], $e=[0], b1_0=[$1]}, {a1=[$0], b1=[null], c1=[$2], d1=[$3], $e=[1], b1_0=[$1]}], projects=[{a1, b1, c1, d1, 0 AS $e, b1 AS b1_0}, {a1, null AS b1, c1, d1, 1 AS $e, b1 AS b1_0}]) + +- Expand(projects=[a1, b1, c1, d1, $e, b1_0], projects=[{a1, b1, c1, d1, 0 AS $e, b1 AS b1_0}, {a1, null AS b1, c1, d1, 1 AS $e, b1 AS b1_0}]) +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1]) ]]> </Resource> @@ -945,7 +945,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[SUM($2)]) HashAggregate(isMerge=[false], groupBy=[a1], select=[a1, COUNT(c1) FILTER $g_0 AS EXPR$1, MIN(EXPR$2) FILTER $g_1 AS EXPR$2]) +- Exchange(distribution=[hash[a1]]) +- Calc(select=[a1, c1, b1 AS EXPR$2, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 1) AS $g_1]) - +- Expand(projects=[{a1=[$0], b1=[$1], c1=[$2], d1=[$3], $e=[0]}, {a1=[$0], b1=[$1], c1=[null], d1=[$3], $e=[1]}], projects=[{a1, b1, c1, d1, 0 AS $e}, {a1, b1, null AS c1, d1, 1 AS $e}]) + +- Expand(projects=[a1, b1, c1, d1, $e], projects=[{a1, b1, c1, d1, 0 AS $e}, {a1, b1, null AS c1, d1, 1 AS $e}]) +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1]) ]]> </Resource> @@ -967,7 +967,7 @@ SortAggregate(isMerge=[false], groupBy=[a1], select=[a1, MIN(d1) FILTER $g_1 AS +- Sort(orderBy=[a1 ASC]) +- Exchange(distribution=[hash[a1]]) +- Calc(select=[a1, c1, d1, b1 AS EXPR$3, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 1) AS $g_1]) - +- Expand(projects=[{a1=[$0], b1=[$1], c1=[$2], d1=[$3], $e=[0]}, {a1=[$0], b1=[$1], c1=[null], d1=[$3], $e=[1]}], projects=[{a1, b1, c1, d1, 0 AS $e}, {a1, b1, null AS c1, d1, 1 AS $e}]) + +- Expand(projects=[a1, b1, c1, d1, $e], projects=[{a1, b1, c1, d1, 0 AS $e}, {a1, b1, null AS c1, d1, 1 AS $e}]) +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1]) ]]> </Resource> diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml index 07b7b71..58fc85d 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml @@ -33,7 +33,7 @@ FROM MyTable1 <![CDATA[ LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], EXPR$4=[TUMBLE_START($0)], EXPR$5=[TUMBLE_END($0)]) +- LogicalAggregate(group=[{0}], EXPR$0=[VAR_POP($1)], EXPR$1=[VAR_SAMP($1)], EXPR$2=[STDDEV_POP($1)], EXPR$3=[STDDEV_SAMP($1)]) - +- LogicalProject($f0=[TUMBLE($0, 900000:INTERVAL MINUTE)], b=[$2]) + +- LogicalProject($f0=[$TUMBLE($0, 900000:INTERVAL MINUTE)], b=[$2]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]]) ]]> </Resource> @@ -65,7 +65,7 @@ FROM MyTable1 <![CDATA[ LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], EXPR$4=[TUMBLE_START($0)], EXPR$5=[TUMBLE_END($0)]) +- LogicalAggregate(group=[{0}], EXPR$0=[VAR_POP($1)], EXPR$1=[VAR_SAMP($1)], EXPR$2=[STDDEV_POP($1)], EXPR$3=[STDDEV_SAMP($1)]) - +- LogicalProject($f0=[TUMBLE($0, 900000:INTERVAL MINUTE)], b=[$2]) + +- LogicalProject($f0=[$TUMBLE($0, 900000:INTERVAL MINUTE)], b=[$2]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]]) ]]> </Resource> @@ -96,7 +96,7 @@ FROM MyTable1 <![CDATA[ LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], EXPR$4=[TUMBLE_START($0)], EXPR$5=[TUMBLE_END($0)]) +- LogicalAggregate(group=[{0}], EXPR$0=[VAR_POP($1)], EXPR$1=[VAR_SAMP($1)], EXPR$2=[STDDEV_POP($1)], EXPR$3=[STDDEV_SAMP($1)]) - +- LogicalProject($f0=[TUMBLE($0, 900000:INTERVAL MINUTE)], b=[$2]) + +- LogicalProject($f0=[$TUMBLE($0, 900000:INTERVAL MINUTE)], b=[$2]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]]) ]]> </Resource> @@ -111,154 +111,6 @@ Calc(select=[CAST(/(-($f0, /(*($f1, $f1), $f2)), $f2)) AS EXPR$0, CAST(/(-($f0, ]]> </Resource> </TestCase> - <TestCase name="testWindowAggregateWithDifferentWindows[aggStrategy=AUTO]"> - <Resource name="sql"> - <![CDATA[ -WITH window_1h AS ( - SELECT 1 - FROM MyTable2 - GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '1' HOUR) -), - -window_2h AS ( - SELECT 1 - FROM MyTable2 - GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) -) - -(SELECT * FROM window_1h) -UNION ALL -(SELECT * FROM window_2h) -]]> - </Resource> - <Resource name="planBefore"> - <![CDATA[ -LogicalUnion(all=[true]) -:- LogicalProject(EXPR$0=[1]) -: +- LogicalAggregate(group=[{0}]) -: +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 3600000:INTERVAL HOUR)]) -: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) -+- LogicalProject(EXPR$0=[1]) - +- LogicalAggregate(group=[{0}]) - +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 7200000:INTERVAL HOUR)]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) -]]> - </Resource> - <Resource name="planAfter"> - <![CDATA[ -Union(all=[true], union=[EXPR$0]) -:- Calc(select=[1 AS EXPR$0]) -: +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 3600000)], select=[]) -: +- Exchange(distribution=[single]) -: +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 3600000)], select=[]) -: +- Calc(select=[ts], reuse_id=[1]) -: +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts]) -+- Calc(select=[1 AS EXPR$0]) - +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 3600000)], select=[]) - +- Exchange(distribution=[single]) - +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 3600000)], select=[]) - +- Reused(reference_id=[1]) -]]> - </Resource> - </TestCase> - <TestCase name="testWindowAggregateWithDifferentWindows[aggStrategy=ONE_PHASE]"> - <Resource name="sql"> - <![CDATA[ -WITH window_1h AS ( - SELECT 1 - FROM MyTable2 - GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '1' HOUR) -), - -window_2h AS ( - SELECT 1 - FROM MyTable2 - GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) -) - -(SELECT * FROM window_1h) -UNION ALL -(SELECT * FROM window_2h) -]]> - </Resource> - <Resource name="planBefore"> - <![CDATA[ -LogicalUnion(all=[true]) -:- LogicalProject(EXPR$0=[1]) -: +- LogicalAggregate(group=[{0}]) -: +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 3600000:INTERVAL HOUR)]) -: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) -+- LogicalProject(EXPR$0=[1]) - +- LogicalAggregate(group=[{0}]) - +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 7200000:INTERVAL HOUR)]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) -]]> - </Resource> - <Resource name="planAfter"> - <![CDATA[ -Union(all=[true], union=[EXPR$0]) -:- Calc(select=[1 AS EXPR$0]) -: +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 3600000)], select=[]) -: +- Exchange(distribution=[single], reuse_id=[1]) -: +- Calc(select=[ts]) -: +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts]) -+- Calc(select=[1 AS EXPR$0]) - +- SortWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 3600000)], select=[]) - +- Sort(orderBy=[ts ASC]) - +- Reused(reference_id=[1]) -]]> - </Resource> - </TestCase> - <TestCase name="testWindowAggregateWithDifferentWindows[aggStrategy=TWO_PHASE]"> - <Resource name="sql"> - <![CDATA[ -WITH window_1h AS ( - SELECT 1 - FROM MyTable2 - GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '1' HOUR) -), - -window_2h AS ( - SELECT 1 - FROM MyTable2 - GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) -) - -(SELECT * FROM window_1h) -UNION ALL -(SELECT * FROM window_2h) -]]> - </Resource> - <Resource name="planBefore"> - <![CDATA[ -LogicalUnion(all=[true]) -:- LogicalProject(EXPR$0=[1]) -: +- LogicalAggregate(group=[{0}]) -: +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 3600000:INTERVAL HOUR)]) -: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) -+- LogicalProject(EXPR$0=[1]) - +- LogicalAggregate(group=[{0}]) - +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 7200000:INTERVAL HOUR)]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) -]]> - </Resource> - <Resource name="planAfter"> - <![CDATA[ -Union(all=[true], union=[EXPR$0]) -:- Calc(select=[1 AS EXPR$0]) -: +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 3600000)], select=[]) -: +- Exchange(distribution=[single]) -: +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 3600000)], select=[]) -: +- Calc(select=[ts], reuse_id=[1]) -: +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts]) -+- Calc(select=[1 AS EXPR$0]) - +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 3600000)], select=[]) - +- Exchange(distribution=[single]) - +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 3600000)], select=[]) - +- Reused(reference_id=[1]) -]]> - </Resource> - </TestCase> <TestCase name="testExpressionOnWindowHavingFunction[aggStrategy=AUTO]"> <Resource name="sql"> <![CDATA[ @@ -471,7 +323,7 @@ Calc(select=[EXPR$0, w$start AS EXPR$1, w$end AS EXPR$2]) <![CDATA[ LogicalProject(EXPR$0=[$1], EXPR$1=[$2]) +- LogicalAggregate(group=[{0}], EXPR$0=[AVG($1)], EXPR$1=[SUM($2)]) - +- LogicalProject($f0=[TUMBLE($1, 3000:INTERVAL SECOND)], c=[$2], a=[$0]) + +- LogicalProject($f0=[$TUMBLE($1, 3000:INTERVAL SECOND)], c=[$2], a=[$0]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]]) ]]> </Resource> @@ -493,7 +345,7 @@ HashWindowAggregate(window=[TumblingGroupWindow('w$, b, 3000)], select=[Final_AV <![CDATA[ LogicalProject(EXPR$0=[$1], EXPR$1=[$2]) +- LogicalAggregate(group=[{0}], EXPR$0=[AVG($1)], EXPR$1=[SUM($2)]) - +- LogicalProject($f0=[TUMBLE($1, 3000:INTERVAL SECOND)], c=[$2], a=[$0]) + +- LogicalProject($f0=[$TUMBLE($1, 3000:INTERVAL SECOND)], c=[$2], a=[$0]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]]) ]]> </Resource> @@ -514,7 +366,7 @@ HashWindowAggregate(window=[TumblingGroupWindow('w$, b, 3000)], select=[AVG(c) A <![CDATA[ LogicalProject(EXPR$0=[$1], EXPR$1=[$2]) +- LogicalAggregate(group=[{0}], EXPR$0=[AVG($1)], EXPR$1=[SUM($2)]) - +- LogicalProject($f0=[TUMBLE($1, 3000:INTERVAL SECOND)], c=[$2], a=[$0]) + +- LogicalProject($f0=[$TUMBLE($1, 3000:INTERVAL SECOND)], c=[$2], a=[$0]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]]) ]]> </Resource> @@ -580,7 +432,7 @@ HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 5400000, 900000)], selec <![CDATA[ LogicalProject(sumA=[$1], cntB=[$2]) +- LogicalAggregate(group=[{0}], sumA=[SUM($1)], cntB=[COUNT($2)]) - +- LogicalProject($f0=[TUMBLE($4, 7200000:INTERVAL HOUR)], a=[$0], b=[$1]) + +- LogicalProject($f0=[$TUMBLE($4, 7200000:INTERVAL HOUR)], a=[$0], b=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) ]]> </Resource> @@ -602,7 +454,7 @@ HashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 7200000)], select=[Fina <![CDATA[ LogicalProject(sumA=[$1], cntB=[$2]) +- LogicalAggregate(group=[{0}], sumA=[SUM($1)], cntB=[COUNT($2)]) - +- LogicalProject($f0=[TUMBLE($4, 7200000:INTERVAL HOUR)], a=[$0], b=[$1]) + +- LogicalProject($f0=[$TUMBLE($4, 7200000:INTERVAL HOUR)], a=[$0], b=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) ]]> </Resource> @@ -623,7 +475,7 @@ HashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 7200000)], select=[SUM( <![CDATA[ LogicalProject(sumA=[$1], cntB=[$2]) +- LogicalAggregate(group=[{0}], sumA=[SUM($1)], cntB=[COUNT($2)]) - +- LogicalProject($f0=[TUMBLE($4, 7200000:INTERVAL HOUR)], a=[$0], b=[$1]) + +- LogicalProject($f0=[$TUMBLE($4, 7200000:INTERVAL HOUR)], a=[$0], b=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) ]]> </Resource> @@ -742,7 +594,7 @@ FROM MyTable2 <![CDATA[ LogicalProject(EXPR$0=[TUMBLE_START($0)], EXPR$1=[TUMBLE_END($0)], EXPR$2=[TUMBLE_ROWTIME($0)], c=[$1], sumA=[$2], minB=[$3]) +- LogicalAggregate(group=[{0, 1}], sumA=[SUM($2)], minB=[MIN($3)]) - +- LogicalProject($f0=[TUMBLE($4, 240000:INTERVAL MINUTE)], c=[$2], a=[$0], b=[$1]) + +- LogicalProject($f0=[$TUMBLE($4, 240000:INTERVAL MINUTE)], c=[$2], a=[$0], b=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) ]]> </Resource> @@ -774,7 +626,7 @@ FROM MyTable2 <![CDATA[ LogicalProject(EXPR$0=[TUMBLE_START($0)], EXPR$1=[TUMBLE_END($0)], EXPR$2=[TUMBLE_ROWTIME($0)], c=[$1], sumA=[$2], minB=[$3]) +- LogicalAggregate(group=[{0, 1}], sumA=[SUM($2)], minB=[MIN($3)]) - +- LogicalProject($f0=[TUMBLE($4, 240000:INTERVAL MINUTE)], c=[$2], a=[$0], b=[$1]) + +- LogicalProject($f0=[$TUMBLE($4, 240000:INTERVAL MINUTE)], c=[$2], a=[$0], b=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) ]]> </Resource> @@ -805,7 +657,7 @@ FROM MyTable2 <![CDATA[ LogicalProject(EXPR$0=[TUMBLE_START($0)], EXPR$1=[TUMBLE_END($0)], EXPR$2=[TUMBLE_ROWTIME($0)], c=[$1], sumA=[$2], minB=[$3]) +- LogicalAggregate(group=[{0, 1}], sumA=[SUM($2)], minB=[MIN($3)]) - +- LogicalProject($f0=[TUMBLE($4, 240000:INTERVAL MINUTE)], c=[$2], a=[$0], b=[$1]) + +- LogicalProject($f0=[$TUMBLE($4, 240000:INTERVAL MINUTE)], c=[$2], a=[$0], b=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) ]]> </Resource> @@ -841,7 +693,7 @@ GROUP BY TUMBLE(b, INTERVAL '15' MINUTE) <![CDATA[ LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)]) +- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)]) - +- LogicalProject($f0=[TUMBLE($1, 900000:INTERVAL MINUTE)], correct=[CASE(=($0, 1), 1, 99)]) + +- LogicalProject($f0=[$TUMBLE($1, 900000:INTERVAL MINUTE)], correct=[CASE(=($0, 1), 1, 99)]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]]) ]]> </Resource> @@ -877,7 +729,7 @@ GROUP BY TUMBLE(b, INTERVAL '15' MINUTE) <![CDATA[ LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)]) +- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)]) - +- LogicalProject($f0=[TUMBLE($1, 900000:INTERVAL MINUTE)], correct=[CASE(=($0, 1), 1, 99)]) + +- LogicalProject($f0=[$TUMBLE($1, 900000:INTERVAL MINUTE)], correct=[CASE(=($0, 1), 1, 99)]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]]) ]]> </Resource> @@ -912,7 +764,7 @@ GROUP BY TUMBLE(b, INTERVAL '15' MINUTE) <![CDATA[ LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)]) +- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)]) - +- LogicalProject($f0=[TUMBLE($1, 900000:INTERVAL MINUTE)], correct=[CASE(=($0, 1), 1, 99)]) + +- LogicalProject($f0=[$TUMBLE($1, 900000:INTERVAL MINUTE)], correct=[CASE(=($0, 1), 1, 99)]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]]) ]]> </Resource> @@ -1260,7 +1112,7 @@ Calc(select=[EXPR$0]) <![CDATA[ LogicalProject(EXPR$0=[$2]) +- LogicalAggregate(group=[{0, 1}], EXPR$0=[COUNT($2)]) - +- LogicalProject(a=[$1], $f1=[TUMBLE($0, 3000:INTERVAL SECOND)], c=[$3]) + +- LogicalProject(a=[$1], $f1=[$TUMBLE($0, 3000:INTERVAL SECOND)], c=[$3]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]]) ]]> </Resource> @@ -1283,7 +1135,7 @@ Calc(select=[EXPR$0]) <![CDATA[ LogicalProject(EXPR$0=[$2]) +- LogicalAggregate(group=[{0, 1}], EXPR$0=[COUNT($2)]) - +- LogicalProject(a=[$1], $f1=[TUMBLE($0, 3000:INTERVAL SECOND)], c=[$3]) + +- LogicalProject(a=[$1], $f1=[$TUMBLE($0, 3000:INTERVAL SECOND)], c=[$3]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]]) ]]> </Resource> @@ -1305,7 +1157,7 @@ Calc(select=[EXPR$0]) <![CDATA[ LogicalProject(EXPR$0=[$2]) +- LogicalAggregate(group=[{0, 1}], EXPR$0=[COUNT($2)]) - +- LogicalProject(a=[$1], $f1=[TUMBLE($0, 3000:INTERVAL SECOND)], c=[$3]) + +- LogicalProject(a=[$1], $f1=[$TUMBLE($0, 3000:INTERVAL SECOND)], c=[$3]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]]) ]]> </Resource> @@ -1328,7 +1180,7 @@ Calc(select=[EXPR$0]) <![CDATA[ LogicalProject(EXPR$0=[$3], EXPR$1=[$4]) +- LogicalAggregate(group=[{0, 1, 2}], EXPR$0=[AVG($3)], EXPR$1=[COUNT($0)]) - +- LogicalProject(a=[$0], d=[$3], $f2=[TUMBLE($1, 3000:INTERVAL SECOND)], c=[$2]) + +- LogicalProject(a=[$0], d=[$3], $f2=[$TUMBLE($1, 3000:INTERVAL SECOND)], c=[$2]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]]) ]]> </Resource> @@ -1350,7 +1202,7 @@ Calc(select=[EXPR$0, EXPR$1]) <![CDATA[ LogicalProject(EXPR$0=[$3], EXPR$1=[$4]) +- LogicalAggregate(group=[{0, 1, 2}], EXPR$0=[AVG($3)], EXPR$1=[COUNT($0)]) - +- LogicalProject(a=[$0], d=[$3], $f2=[TUMBLE($1, 3000:INTERVAL SECOND)], c=[$2]) + +- LogicalProject(a=[$0], d=[$3], $f2=[$TUMBLE($1, 3000:INTERVAL SECOND)], c=[$2]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]]) ]]> </Resource> @@ -1371,7 +1223,7 @@ Calc(select=[EXPR$0, EXPR$1]) <![CDATA[ LogicalProject(EXPR$0=[$3], EXPR$1=[$4]) +- LogicalAggregate(group=[{0, 1, 2}], EXPR$0=[AVG($3)], EXPR$1=[COUNT($0)]) - +- LogicalProject(a=[$0], d=[$3], $f2=[TUMBLE($1, 3000:INTERVAL SECOND)], c=[$2]) + +- LogicalProject(a=[$0], d=[$3], $f2=[$TUMBLE($1, 3000:INTERVAL SECOND)], c=[$2]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]]) ]]> </Resource> @@ -1385,30 +1237,6 @@ Calc(select=[EXPR$0, EXPR$1]) ]]> </Resource> </TestCase> - <TestCase name="testTumblingWindowWithUdAgg[aggStrategy=AUTO]"> - <Resource name="sql"> - <![CDATA[SELECT weightedAvg(b, a) AS wAvg FROM MyTable2 GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE)]]> - </Resource> - <Resource name="planBefore"> - <![CDATA[ -LogicalProject(wAvg=[$1]) -+- LogicalAggregate(group=[{0}], wAvg=[weightedAvg($1, $2)]) - +- LogicalProject($f0=[TUMBLE($4, 240000:INTERVAL MINUTE)], b=[$1], a=[$0]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) -]]> - </Resource> - <Resource name="planAfter"> - <![CDATA[ -SortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 240000)], select=[Final_weightedAvg(wAvg) AS wAvg]) -+- Sort(orderBy=[assignedWindow$ ASC]) - +- Exchange(distribution=[single]) - +- LocalSortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 240000)], select=[Partial_weightedAvg(b, a) AS wAvg]) - +- Sort(orderBy=[ts ASC]) - +- Calc(select=[ts, b, a]) - +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts]) -]]> - </Resource> - </TestCase> <TestCase name="testTumblingWindowSortAgg1[aggStrategy=AUTO]"> <Resource name="sql"> <![CDATA[SELECT MAX(c) FROM MyTable1 GROUP BY a, TUMBLE(ts, INTERVAL '3' SECOND)]]> @@ -1417,7 +1245,7 @@ SortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 240000)], select=[Final <![CDATA[ LogicalProject(EXPR$0=[$2]) +- LogicalAggregate(group=[{0, 1}], EXPR$0=[MAX($2)]) - +- LogicalProject(a=[$1], $f1=[TUMBLE($0, 3000:INTERVAL SECOND)], c=[$3]) + +- LogicalProject(a=[$1], $f1=[$TUMBLE($0, 3000:INTERVAL SECOND)], c=[$3]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]]) ]]> </Resource> @@ -1442,7 +1270,7 @@ Calc(select=[EXPR$0]) <![CDATA[ LogicalProject(EXPR$0=[$2]) +- LogicalAggregate(group=[{0, 1}], EXPR$0=[MAX($2)]) - +- LogicalProject(a=[$1], $f1=[TUMBLE($0, 3000:INTERVAL SECOND)], c=[$3]) + +- LogicalProject(a=[$1], $f1=[$TUMBLE($0, 3000:INTERVAL SECOND)], c=[$3]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]]) ]]> </Resource> @@ -1465,7 +1293,7 @@ Calc(select=[EXPR$0]) <![CDATA[ LogicalProject(EXPR$0=[$2]) +- LogicalAggregate(group=[{0, 1}], EXPR$0=[MAX($2)]) - +- LogicalProject(a=[$1], $f1=[TUMBLE($0, 3000:INTERVAL SECOND)], c=[$3]) + +- LogicalProject(a=[$1], $f1=[$TUMBLE($0, 3000:INTERVAL SECOND)], c=[$3]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [TestTableSource(ts, a, b, c)]]]) ]]> </Resource> @@ -1490,7 +1318,7 @@ Calc(select=[EXPR$0]) <![CDATA[ LogicalProject(EXPR$0=[$3], EXPR$1=[$4]) +- LogicalAggregate(group=[{0, 1, 2}], EXPR$0=[AVG($3)], EXPR$1=[countFun($0)]) - +- LogicalProject(a=[$0], d=[$3], $f2=[TUMBLE($1, 3000:INTERVAL SECOND)], c=[$2]) + +- LogicalProject(a=[$0], d=[$3], $f2=[$TUMBLE($1, 3000:INTERVAL SECOND)], c=[$2]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]]) ]]> </Resource> @@ -1512,7 +1340,7 @@ Calc(select=[EXPR$0, EXPR$1]) <![CDATA[ LogicalProject(EXPR$0=[$3], EXPR$1=[$4]) +- LogicalAggregate(group=[{0, 1, 2}], EXPR$0=[AVG($3)], EXPR$1=[countFun($0)]) - +- LogicalProject(a=[$0], d=[$3], $f2=[TUMBLE($1, 3000:INTERVAL SECOND)], c=[$2]) + +- LogicalProject(a=[$0], d=[$3], $f2=[$TUMBLE($1, 3000:INTERVAL SECOND)], c=[$2]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]]) ]]> </Resource> @@ -1534,7 +1362,7 @@ Calc(select=[EXPR$0, EXPR$1]) <![CDATA[ LogicalProject(EXPR$0=[$3], EXPR$1=[$4]) +- LogicalAggregate(group=[{0, 1, 2}], EXPR$0=[AVG($3)], EXPR$1=[countFun($0)]) - +- LogicalProject(a=[$0], d=[$3], $f2=[TUMBLE($1, 3000:INTERVAL SECOND)], c=[$2]) + +- LogicalProject(a=[$0], d=[$3], $f2=[$TUMBLE($1, 3000:INTERVAL SECOND)], c=[$2]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, d)]]]) ]]> </Resource> @@ -1550,26 +1378,27 @@ Calc(select=[EXPR$0, EXPR$1]) ]]> </Resource> </TestCase> - <TestCase name="testWindowEndOnly[aggStrategy=AUTO]"> + <TestCase name="testTumblingWindowWithUdAgg[aggStrategy=AUTO]"> <Resource name="sql"> - <![CDATA[SELECT TUMBLE_END(ts, INTERVAL '4' MINUTE) FROM MyTable2 GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE), c]]> + <![CDATA[SELECT weightedAvg(b, a) AS wAvg FROM MyTable2 GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE)]]> </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalProject(EXPR$0=[TUMBLE_END($0)]) -+- LogicalAggregate(group=[{0, 1}]) - +- LogicalProject($f0=[TUMBLE($4, 240000:INTERVAL MINUTE)], c=[$2]) +LogicalProject(wAvg=[$1]) ++- LogicalAggregate(group=[{0}], wAvg=[weightedAvg($1, $2)]) + +- LogicalProject($f0=[$TUMBLE($4, 240000:INTERVAL MINUTE)], b=[$1], a=[$0]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ -Calc(select=[w$end AS EXPR$0]) -+- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts, 240000)], properties=[w$start, w$end, w$rowtime], select=[c]) - +- Exchange(distribution=[hash[c]]) - +- LocalHashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts, 240000)], properties=[w$start, w$end, w$rowtime], select=[c]) - +- Calc(select=[ts, c]) - +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts]) +SortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 240000)], select=[Final_weightedAvg(wAvg) AS wAvg]) ++- Sort(orderBy=[assignedWindow$ ASC]) + +- Exchange(distribution=[single]) + +- LocalSortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 240000)], select=[Partial_weightedAvg(b, a) AS wAvg]) + +- Sort(orderBy=[ts ASC]) + +- Calc(select=[ts, b, a]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts]) ]]> </Resource> </TestCase> @@ -1581,7 +1410,7 @@ Calc(select=[w$end AS EXPR$0]) <![CDATA[ LogicalProject(wAvg=[$1]) +- LogicalAggregate(group=[{0}], wAvg=[weightedAvg($1, $2)]) - +- LogicalProject($f0=[TUMBLE($4, 240000:INTERVAL MINUTE)], b=[$1], a=[$0]) + +- LogicalProject($f0=[$TUMBLE($4, 240000:INTERVAL MINUTE)], b=[$1], a=[$0]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) ]]> </Resource> @@ -1595,6 +1424,151 @@ SortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 240000)], select=[weigh ]]> </Resource> </TestCase> + <TestCase name="testTumblingWindowWithUdAgg[aggStrategy=TWO_PHASE]"> + <Resource name="sql"> + <![CDATA[SELECT weightedAvg(b, a) AS wAvg FROM MyTable2 GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE)]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(wAvg=[$1]) ++- LogicalAggregate(group=[{0}], wAvg=[weightedAvg($1, $2)]) + +- LogicalProject($f0=[$TUMBLE($4, 240000:INTERVAL MINUTE)], b=[$1], a=[$0]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +SortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 240000)], select=[Final_weightedAvg(wAvg) AS wAvg]) ++- Sort(orderBy=[assignedWindow$ ASC]) + +- Exchange(distribution=[single]) + +- LocalSortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 240000)], select=[Partial_weightedAvg(b, a) AS wAvg]) + +- Sort(orderBy=[ts ASC]) + +- Calc(select=[ts, b, a]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts]) +]]> + </Resource> + </TestCase> + <TestCase name="testWindowAggregateWithDifferentWindows[aggStrategy=AUTO]"> + <Resource name="sql"> + <![CDATA[ +WITH window_1h AS ( + SELECT 1 + FROM MyTable2 + GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '1' HOUR) +), + +window_2h AS ( + SELECT 1 + FROM MyTable2 + GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) +) + +(SELECT * FROM window_1h) +UNION ALL +(SELECT * FROM window_2h) +]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalUnion(all=[true]) +:- LogicalProject(EXPR$0=[1]) +: +- LogicalAggregate(group=[{0}]) +: +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 3600000:INTERVAL HOUR)]) +: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) ++- LogicalProject(EXPR$0=[1]) + +- LogicalAggregate(group=[{0}]) + +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 7200000:INTERVAL HOUR)]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Union(all=[true], union=[EXPR$0]) +:- Calc(select=[1 AS EXPR$0]) +: +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 3600000)], select=[]) +: +- Exchange(distribution=[single]) +: +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 3600000)], select=[]) +: +- Calc(select=[ts], reuse_id=[1]) +: +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts]) ++- Calc(select=[1 AS EXPR$0]) + +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 3600000)], select=[]) + +- Exchange(distribution=[single]) + +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 3600000)], select=[]) + +- Reused(reference_id=[1]) +]]> + </Resource> + </TestCase> + <TestCase name="testWindowEndOnly[aggStrategy=AUTO]"> + <Resource name="sql"> + <![CDATA[SELECT TUMBLE_END(ts, INTERVAL '4' MINUTE) FROM MyTable2 GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE), c]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(EXPR$0=[TUMBLE_END($0)]) ++- LogicalAggregate(group=[{0, 1}]) + +- LogicalProject($f0=[$TUMBLE($4, 240000:INTERVAL MINUTE)], c=[$2]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[w$end AS EXPR$0]) ++- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts, 240000)], properties=[w$start, w$end, w$rowtime], select=[c]) + +- Exchange(distribution=[hash[c]]) + +- LocalHashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts, 240000)], properties=[w$start, w$end, w$rowtime], select=[c]) + +- Calc(select=[ts, c]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts]) +]]> + </Resource> + </TestCase> + <TestCase name="testWindowAggregateWithDifferentWindows[aggStrategy=ONE_PHASE]"> + <Resource name="sql"> + <![CDATA[ +WITH window_1h AS ( + SELECT 1 + FROM MyTable2 + GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '1' HOUR) +), + +window_2h AS ( + SELECT 1 + FROM MyTable2 + GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) +) + +(SELECT * FROM window_1h) +UNION ALL +(SELECT * FROM window_2h) +]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalUnion(all=[true]) +:- LogicalProject(EXPR$0=[1]) +: +- LogicalAggregate(group=[{0}]) +: +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 3600000:INTERVAL HOUR)]) +: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) ++- LogicalProject(EXPR$0=[1]) + +- LogicalAggregate(group=[{0}]) + +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 7200000:INTERVAL HOUR)]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Union(all=[true], union=[EXPR$0]) +:- Calc(select=[1 AS EXPR$0]) +: +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 3600000)], select=[]) +: +- Exchange(distribution=[single], reuse_id=[1]) +: +- Calc(select=[ts]) +: +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts]) ++- Calc(select=[1 AS EXPR$0]) + +- SortWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 3600000)], select=[]) + +- Sort(orderBy=[ts ASC]) + +- Reused(reference_id=[1]) +]]> + </Resource> + </TestCase> <TestCase name="testWindowEndOnly[aggStrategy=ONE_PHASE]"> <Resource name="sql"> <![CDATA[SELECT TUMBLE_END(ts, INTERVAL '4' MINUTE) FROM MyTable2 GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE), c]]> @@ -1603,7 +1577,7 @@ SortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 240000)], select=[weigh <![CDATA[ LogicalProject(EXPR$0=[TUMBLE_END($0)]) +- LogicalAggregate(group=[{0, 1}]) - +- LogicalProject($f0=[TUMBLE($4, 240000:INTERVAL MINUTE)], c=[$2]) + +- LogicalProject($f0=[$TUMBLE($4, 240000:INTERVAL MINUTE)], c=[$2]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) ]]> </Resource> @@ -1617,27 +1591,53 @@ Calc(select=[w$end AS EXPR$0]) ]]> </Resource> </TestCase> - <TestCase name="testTumblingWindowWithUdAgg[aggStrategy=TWO_PHASE]"> + <TestCase name="testWindowAggregateWithDifferentWindows[aggStrategy=TWO_PHASE]"> <Resource name="sql"> - <![CDATA[SELECT weightedAvg(b, a) AS wAvg FROM MyTable2 GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE)]]> + <![CDATA[ +WITH window_1h AS ( + SELECT 1 + FROM MyTable2 + GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '1' HOUR) +), + +window_2h AS ( + SELECT 1 + FROM MyTable2 + GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) +) + +(SELECT * FROM window_1h) +UNION ALL +(SELECT * FROM window_2h) +]]> </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalProject(wAvg=[$1]) -+- LogicalAggregate(group=[{0}], wAvg=[weightedAvg($1, $2)]) - +- LogicalProject($f0=[TUMBLE($4, 240000:INTERVAL MINUTE)], b=[$1], a=[$0]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) +LogicalUnion(all=[true]) +:- LogicalProject(EXPR$0=[1]) +: +- LogicalAggregate(group=[{0}]) +: +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 3600000:INTERVAL HOUR)]) +: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) ++- LogicalProject(EXPR$0=[1]) + +- LogicalAggregate(group=[{0}]) + +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 7200000:INTERVAL HOUR)]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ -SortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 240000)], select=[Final_weightedAvg(wAvg) AS wAvg]) -+- Sort(orderBy=[assignedWindow$ ASC]) - +- Exchange(distribution=[single]) - +- LocalSortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 240000)], select=[Partial_weightedAvg(b, a) AS wAvg]) - +- Sort(orderBy=[ts ASC]) - +- Calc(select=[ts, b, a]) - +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts]) +Union(all=[true], union=[EXPR$0]) +:- Calc(select=[1 AS EXPR$0]) +: +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 3600000)], select=[]) +: +- Exchange(distribution=[single]) +: +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 3600000)], select=[]) +: +- Calc(select=[ts], reuse_id=[1]) +: +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts]) ++- Calc(select=[1 AS EXPR$0]) + +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 3600000)], select=[]) + +- Exchange(distribution=[single]) + +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 3600000)], select=[]) + +- Reused(reference_id=[1]) ]]> </Resource> </TestCase> @@ -1649,7 +1649,7 @@ SortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 240000)], select=[Final <![CDATA[ LogicalProject(EXPR$0=[TUMBLE_END($0)]) +- LogicalAggregate(group=[{0, 1}]) - +- LogicalProject($f0=[TUMBLE($4, 240000:INTERVAL MINUTE)], c=[$2]) + +- LogicalProject($f0=[$TUMBLE($4, 240000:INTERVAL MINUTE)], c=[$2]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]]) ]]> </Resource> diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml index 14b9c6e..09639d3 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml @@ -324,7 +324,7 @@ FlinkLogicalAggregate(group=[{0, 1, 2}], b1=[AUXILIARY_GROUP($3)], EXPR$4=[COUNT <![CDATA[ LogicalProject(a4=[$0], b4=[$1], EXPR$2=[$3]) +- LogicalAggregate(group=[{0, 1, 2}], EXPR$2=[COUNT($3)]) - +- LogicalProject(a4=[$0], b4=[$1], $f2=[TUMBLE($3, 900000:INTERVAL MINUTE)], c4=[$2]) + +- LogicalProject(a4=[$0], b4=[$1], $f2=[$TUMBLE($3, 900000:INTERVAL MINUTE)], c4=[$2]) +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]]) ]]> </Resource> @@ -343,7 +343,7 @@ FlinkLogicalWindowAggregate(group=[{0}], b4=[AUXILIARY_GROUP($1)], EXPR$2=[COUNT <![CDATA[ LogicalProject(a4=[$0], c4=[$1], EXPR$2=[$3], EXPR$3=[$4]) +- LogicalAggregate(group=[{0, 1, 2}], EXPR$2=[COUNT($3)], EXPR$3=[AVG($3)]) - +- LogicalProject(a4=[$0], c4=[$2], $f2=[TUMBLE($3, 900000:INTERVAL MINUTE)], b4=[$1]) + +- LogicalProject(a4=[$0], c4=[$2], $f2=[$TUMBLE($3, 900000:INTERVAL MINUTE)], b4=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]]) ]]> </Resource> @@ -363,7 +363,7 @@ FlinkLogicalWindowAggregate(group=[{0}], c4=[AUXILIARY_GROUP($2)], EXPR$2=[COUNT LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT($3)]) +- LogicalProject(a4=[$0], c4=[$1], s=[TUMBLE_START($2)], b4=[$3]) +- LogicalAggregate(group=[{0, 1, 2}], b4=[VAR_POP($3)]) - +- LogicalProject(a4=[$0], c4=[$2], $f2=[TUMBLE($3, 900000:INTERVAL MINUTE)], b4=[$1]) + +- LogicalProject(a4=[$0], c4=[$2], $f2=[$TUMBLE($3, 900000:INTERVAL MINUTE)], b4=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]]) ]]> </Resource> @@ -387,7 +387,7 @@ FlinkLogicalCalc(select=[a4, c4, s, EXPR$3]) LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT($3)]) +- LogicalProject(a4=[$0], c4=[$1], e=[TUMBLE_END($2)], b4=[$3]) +- LogicalAggregate(group=[{0, 1, 2}], b4=[VAR_POP($3)]) - +- LogicalProject(a4=[$0], c4=[$2], $f2=[TUMBLE($3, 900000:INTERVAL MINUTE)], b4=[$1]) + +- LogicalProject(a4=[$0], c4=[$2], $f2=[$TUMBLE($3, 900000:INTERVAL MINUTE)], b4=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]]) ]]> </Resource> @@ -411,7 +411,7 @@ FlinkLogicalCalc(select=[a4, c4, e, EXPR$3]) LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()]) +- LogicalProject(a4=[$0], b4=[$3], c4=[$1]) +- LogicalAggregate(group=[{0, 1, 2}], b4=[VAR_POP($3)]) - +- LogicalProject(a4=[$0], c4=[$2], $f2=[TUMBLE($3, 900000:INTERVAL MINUTE)], b4=[$1]) + +- LogicalProject(a4=[$0], c4=[$2], $f2=[$TUMBLE($3, 900000:INTERVAL MINUTE)], b4=[$1]) +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4, d4)]]]) ]]> </Resource> @@ -439,7 +439,7 @@ LogicalAggregate(group=[{0, 1, 2}], groups=[[{0, 1, 2}, {0, 1}, {0, 2}, {0}, {1, <![CDATA[ FlinkLogicalCalc(select=[a1, b1, c1, EXPR$3]) +- FlinkLogicalAggregate(group=[{0, 1, 2, 4}], EXPR$3=[COUNT($3)]) - +- FlinkLogicalExpand(projects=[{a1=[$0], b1=[$1], c1=[$2], d1=[$3], $e=[0]}, {a1=[$0], b1=[$1], c1=[null], d1=[$3], $e=[1]}, {a1=[$0], b1=[null], c1=[$2], d1=[$3], $e=[2]}, {a1=[$0], b1=[null], c1=[null], d1=[$3], $e=[3]}, {a1=[null], b1=[$1], c1=[$2], d1=[$3], $e=[4]}, {a1=[null], b1=[$1], c1=[null], d1=[$3], $e=[5]}, {a1=[null], b1=[null], c1=[$2], d1=[$3], $e=[6]}, {a1=[null], b1=[null], c1=[null], d1=[$3], $e=[7]}]) + +- FlinkLogicalExpand(projects=[a1, b1, c1, d1, $e]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1]) ]]> </Resource> @@ -458,7 +458,7 @@ LogicalAggregate(group=[{0, 1, 2}], groups=[[{0, 1}, {0, 2}]], EXPR$3=[COUNT($3) <![CDATA[ FlinkLogicalCalc(select=[a1, b1, c1, EXPR$3]) +- FlinkLogicalAggregate(group=[{0, 1, 2, 4}], EXPR$3=[COUNT($3)]) - +- FlinkLogicalExpand(projects=[{a1=[$0], b1=[$1], c1=[null], d1=[$3], $e=[1]}, {a1=[$0], b1=[null], c1=[$2], d1=[$3], $e=[2]}]) + +- FlinkLogicalExpand(projects=[a1, b1, c1, d1, $e]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1]) ]]> </Resource> @@ -479,7 +479,7 @@ LogicalProject(a1=[$0], s=[$2]) <![CDATA[ FlinkLogicalCalc(select=[a1, s]) +- FlinkLogicalAggregate(group=[{0, 1, 3}], s=[SUM($2)]) - +- FlinkLogicalExpand(projects=[{a1=[$0], c1=[$1], b1=[$2], $e=[0]}, {a1=[$0], c1=[null], b1=[$2], $e=[1]}, {a1=[null], c1=[null], b1=[$2], $e=[3]}]) + +- FlinkLogicalExpand(projects=[a1, c1, b1, $e]) +- FlinkLogicalCalc(select=[a1, c1, b1]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1]) ]]> @@ -500,7 +500,7 @@ LogicalProject(a1=[$0], b1=[$1], c1=[$2], EXPR$3=[$4]) <![CDATA[ FlinkLogicalCalc(select=[a1, b1, c1, EXPR$3]) +- FlinkLogicalAggregate(group=[{0, 2, 3, 4}], b1=[AUXILIARY_GROUP($1)], EXPR$3=[COUNT($5)]) - +- FlinkLogicalExpand(projects=[{a1=[$0], b1=[$1], c1=[$2], d1=[null], $e=[1], d1_0=[$3]}, {a1=[$0], b1=[$1], c1=[null], d1=[$3], $e=[2], d1_0=[$3]}]) + +- FlinkLogicalExpand(projects=[a1, b1, c1, d1, $e, d1_0]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1]) ]]> </Resource> @@ -557,7 +557,7 @@ LogicalAggregate(group=[{0, 1, 2}], groups=[[{0, 1, 2}, {0, 1}, {0}, {}]], EXPR$ <![CDATA[ FlinkLogicalCalc(select=[a1, b1, c1, EXPR$3]) +- FlinkLogicalAggregate(group=[{0, 1, 2, 4}], EXPR$3=[COUNT($3)]) - +- FlinkLogicalExpand(projects=[{a1=[$0], b1=[$1], c1=[$2], d1=[$3], $e=[0]}, {a1=[$0], b1=[$1], c1=[null], d1=[$3], $e=[1]}, {a1=[$0], b1=[null], c1=[null], d1=[$3], $e=[3]}, {a1=[null], b1=[null], c1=[null], d1=[$3], $e=[7]}]) + +- FlinkLogicalExpand(projects=[a1, b1, c1, d1, $e]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1]) ]]> </Resource> @@ -635,7 +635,7 @@ LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT(DISTINCT $2)], EXPR$3=[SUM(DISTIN <![CDATA[ FlinkLogicalAggregate(group=[{0}], d1=[AUXILIARY_GROUP($3)], EXPR$2=[COUNT($2) FILTER $5], EXPR$3=[SUM($1) FILTER $4]) +- FlinkLogicalCalc(select=[a1, b1, c1, d1, =(CASE(=($e, 2:BIGINT), 2:BIGINT, 4:BIGINT), 2) AS $g_2, =(CASE(=($e, 2:BIGINT), 2:BIGINT, 4:BIGINT), 4) AS $g_4]) - +- FlinkLogicalExpand(projects=[{a1=[$0], b1=[$1], c1=[null], d1=[$3], $e=[2]}, {a1=[$0], b1=[null], c1=[$2], d1=[$3], $e=[4]}]) + +- FlinkLogicalExpand(projects=[a1, b1, c1, d1, $e]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1]) ]]> </Resource> @@ -656,7 +656,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[SUM(DISTINCT $1)], EXPR$2=[MAX($1)], EXPR$ FlinkLogicalAggregate(group=[{0}], EXPR$1=[SUM($1) FILTER $4], EXPR$2=[MIN($2) FILTER $5], EXPR$3=[MIN($3) FILTER $5]) +- FlinkLogicalCalc(select=[a1, b1, EXPR$2, EXPR$3, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 1) AS $g_1]) +- FlinkLogicalAggregate(group=[{0, 1, 3}], EXPR$2=[MAX($4)], EXPR$3=[MIN($2)]) - +- FlinkLogicalExpand(projects=[{a1=[$0], b1=[$1], c1=[$2], $e=[0], b1_0=[$1]}, {a1=[$0], b1=[null], c1=[$2], $e=[1], b1_0=[$1]}]) + +- FlinkLogicalExpand(projects=[a1, b1, c1, $e, b1_0]) +- FlinkLogicalCalc(select=[a1, b1, c1]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1]) ]]> @@ -678,7 +678,7 @@ LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT(DISTINCT $2)], EXPR$3=[MAX($3)], FlinkLogicalAggregate(group=[{0}], d1=[MIN($2) FILTER $6], EXPR$2=[COUNT($1) FILTER $5], EXPR$3=[MIN($3) FILTER $6], EXPR$4=[MIN($4) FILTER $6]) +- FlinkLogicalCalc(select=[a1, c1, d1, EXPR$3, EXPR$4, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 1) AS $g_1]) +- FlinkLogicalAggregate(group=[{0, 2, 4}], d1=[AUXILIARY_GROUP($3)], EXPR$3=[MAX($1)], EXPR$4=[SUM($1)]) - +- FlinkLogicalExpand(projects=[{a1=[$0], b1=[$1], c1=[$2], d1=[$3], $e=[0]}, {a1=[$0], b1=[$1], c1=[null], d1=[$3], $e=[1]}]) + +- FlinkLogicalExpand(projects=[a1, b1, c1, d1, $e]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1]) ]]> </Resource> @@ -698,8 +698,8 @@ LogicalProject(EXPR$0=[$2]) <Resource name="planAfter"> <![CDATA[ FlinkLogicalCalc(select=[EXPR$0]) -+- FlinkLogicalAggregate(group=[{0}], EXPR$0=[COUNT($2)]) - +- FlinkLogicalCalc(select=[1 AS $f0, true AS $f1, c1]) ++- FlinkLogicalAggregate(group=[{0}], EXPR$0=[COUNT($1)]) + +- FlinkLogicalCalc(select=[1 AS $f0, c1]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1]) ]]> </Resource> @@ -774,8 +774,8 @@ LogicalProject(a1=[$0], b1=[$1], EXPR$2=[$4]) </Resource> <Resource name="planAfter"> <![CDATA[ -FlinkLogicalAggregate(group=[{0}], b1=[AUXILIARY_GROUP($1)], EXPR$2=[COUNT($4)]) -+- FlinkLogicalCalc(select=[a1, b1, 1 AS $f2, true AS $f3, c1]) +FlinkLogicalAggregate(group=[{0}], b1=[AUXILIARY_GROUP($1)], EXPR$2=[COUNT($2)]) ++- FlinkLogicalCalc(select=[a1, b1, c1]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1]) ]]> </Resource> @@ -836,7 +836,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[SUM($1)]) FlinkLogicalAggregate(group=[{0}], EXPR$1=[COUNT($1) FILTER $3], EXPR$2=[MIN($2) FILTER $4]) +- FlinkLogicalCalc(select=[a1, b1, EXPR$2, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 1) AS $g_1]) +- FlinkLogicalAggregate(group=[{0, 1, 2}], EXPR$2=[SUM($3)]) - +- FlinkLogicalExpand(projects=[{a1=[$0], b1=[$1], $e=[0], b1_0=[$1]}, {a1=[$0], b1=[null], $e=[1], b1_0=[$1]}]) + +- FlinkLogicalExpand(projects=[a1, b1, $e, b1_0]) +- FlinkLogicalCalc(select=[a1, b1]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1]) ]]> @@ -858,7 +858,7 @@ LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT(DISTINCT $2)], EXPR$3=[SUM($2)]) FlinkLogicalAggregate(group=[{0}], c1=[AUXILIARY_GROUP($1)], EXPR$2=[COUNT($2) FILTER $4], EXPR$3=[MIN($3) FILTER $5]) +- FlinkLogicalCalc(select=[a1, c1, b1, EXPR$3, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 1) AS $g_1]) +- FlinkLogicalAggregate(group=[{0, 2, 3}], c1=[AUXILIARY_GROUP($1)], EXPR$3=[SUM($4)]) - +- FlinkLogicalExpand(projects=[{a1=[$0], c1=[$1], b1=[$2], $e=[0], b1_0=[$2]}, {a1=[$0], c1=[$1], b1=[null], $e=[1], b1_0=[$2]}]) + +- FlinkLogicalExpand(projects=[a1, c1, b1, $e, b1_0]) +- FlinkLogicalCalc(select=[a1, c1, b1]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1]) ]]> @@ -880,7 +880,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[SUM($2)]) FlinkLogicalAggregate(group=[{0}], EXPR$1=[COUNT($1) FILTER $3], EXPR$2=[MIN($2) FILTER $4]) +- FlinkLogicalCalc(select=[a1, c1, EXPR$2, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 1) AS $g_1]) +- FlinkLogicalAggregate(group=[{0, 1, 3}], EXPR$2=[SUM($2)]) - +- FlinkLogicalExpand(projects=[{a1=[$0], c1=[$1], b1=[$2], $e=[0]}, {a1=[$0], c1=[null], b1=[$2], $e=[1]}]) + +- FlinkLogicalExpand(projects=[a1, c1, b1, $e]) +- FlinkLogicalCalc(select=[a1, c1, b1]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1]) ]]> @@ -902,7 +902,7 @@ LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT(DISTINCT $2)], EXPR$3=[SUM($3)]) FlinkLogicalAggregate(group=[{0}], d1=[MIN($2) FILTER $5], EXPR$2=[COUNT($1) FILTER $4], EXPR$3=[MIN($3) FILTER $5]) +- FlinkLogicalCalc(select=[a1, c1, d1, EXPR$3, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 1) AS $g_1]) +- FlinkLogicalAggregate(group=[{0, 2, 4}], d1=[AUXILIARY_GROUP($3)], EXPR$3=[SUM($1)]) - +- FlinkLogicalExpand(projects=[{a1=[$0], b1=[$1], c1=[$2], d1=[$3], $e=[0]}, {a1=[$0], b1=[$1], c1=[null], d1=[$3], $e=[1]}]) + +- FlinkLogicalExpand(projects=[a1, b1, c1, d1, $e]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1]) ]]> </Resource> diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml index c2a132a..02c6a23 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml @@ -30,7 +30,7 @@ GROUP BY long, TUMBLE(rowtime, INTERVAL '10' SECOND) <![CDATA[ LogicalProject(long=[$0], cnt=[$2], rt=[TUMBLE_END($1)]) +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)]) - +- LogicalProject(long=[$0], $f1=[TUMBLE($3, 10000:INTERVAL SECOND)], str=[$2]) + +- LogicalProject(long=[$0], $f1=[$TUMBLE($3, 10000:INTERVAL SECOND)], str=[$2]) +- LogicalTableScan(table=[[default_catalog, default_database, T1]]) ]]> </Resource> @@ -88,7 +88,7 @@ GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 count$0) AS EXPR$1 <![CDATA[ LogicalProject(b=[$0], EXPR$1=[$2], EXPR$2=[TUMBLE_START($1)], EXPR$3=[TUMBLE_END($1)]) +- LogicalAggregate(group=[{0, 1}], EXPR$1=[COUNT($2)]) - +- LogicalProject(b=[$1], $f1=[TUMBLE($4, 5000:INTERVAL SECOND)], a=[$0]) + +- LogicalProject(b=[$1], $f1=[$TUMBLE($4, 5000:INTERVAL SECOND)], a=[$0]) +- LogicalJoin(condition=[AND(=($0, $5), >=($4, -($9, 5000:INTERVAL SECOND)), <=($4, +($9, 10000:INTERVAL SECOND)))], joinType=[inner]) :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)]) : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]]) @@ -149,7 +149,7 @@ LogicalProject(b=[$0], EXPR$1=[COUNT($1) OVER (PARTITION BY $0 ORDER BY $2 NULLS +- LogicalJoin(condition=[AND(=($1, $4), >=($2, -($5, 5000:INTERVAL SECOND)), <=($2, +($5, 10000:INTERVAL SECOND)))], joinType=[inner]) :- LogicalProject(b=[$0], a=[$2], rt=[TUMBLE_ROWTIME($1)]) : +- LogicalAggregate(group=[{0, 1}], a=[COUNT($2)]) - : +- LogicalProject(b=[$1], $f1=[TUMBLE($4, 5000:INTERVAL SECOND)], a=[$0]) + : +- LogicalProject(b=[$1], $f1=[$TUMBLE($4, 5000:INTERVAL SECOND)], a=[$0]) : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)]) : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]]) +- LogicalProject(b=[$0], a=[$2], rt=[HOP_ROWTIME($1)]) @@ -251,7 +251,7 @@ LogicalSink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)]) +- LogicalProject($f0=[HOP(TUMBLE_ROWTIME($0), 12000:INTERVAL SECOND, 4000:INTERVAL SECOND)], id1=[$1], text=[$2], $f3=[_UTF-16LE'*']) +- LogicalAggregate(group=[{0, 1}], text=[LISTAGG($2, $3)]) - +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'#']) + +- LogicalProject($f0=[$TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'#']) +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))]) +- LogicalJoin(condition=[true], joinType=[inner]) :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 0:INTERVAL MILLISECOND)]) @@ -262,7 +262,7 @@ LogicalSink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a LogicalSink(name=[`default_catalog`.`default_database`.`appendSink2`], fields=[a, b]) +- LogicalProject(id1=[$1], EXPR$1=[$2]) +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)]) - +- LogicalProject($f0=[TUMBLE($1, 9000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'-']) + +- LogicalProject($f0=[$TUMBLE($1, 9000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'-']) +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))]) +- LogicalJoin(condition=[true], joinType=[inner]) :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 0:INTERVAL MILLISECOND)]) @@ -274,7 +274,7 @@ LogicalSink(name=[`default_catalog`.`default_database`.`appendSink3`], fields=[a +- LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)]) +- LogicalProject(id1=[$1], text=[$2]) +- LogicalAggregate(group=[{0, 1}], text=[LISTAGG($2, $3)]) - +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'#']) + +- LogicalProject($f0=[$TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'#']) +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))]) +- LogicalJoin(condition=[true], joinType=[inner]) :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 0:INTERVAL MILLISECOND)]) @@ -610,9 +610,9 @@ GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(count$0) AS EXPR$1]) <![CDATA[ LogicalProject(b=[$0], EXPR$1=[$2]) +- LogicalAggregate(group=[{0, 1}], EXPR$1=[SUM($2)]) - +- LogicalProject(b=[$0], $f1=[TUMBLE(TUMBLE_ROWTIME($1), 5000:INTERVAL SECOND)], cnt=[$2]) + +- LogicalProject(b=[$0], $f1=[$TUMBLE(TUMBLE_ROWTIME($1), 5000:INTERVAL SECOND)], cnt=[$2]) +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)]) - +- LogicalProject(b=[$1], $f1=[TUMBLE($4, 10000:INTERVAL SECOND)], a=[$0]) + +- LogicalProject(b=[$1], $f1=[$TUMBLE($4, 10000:INTERVAL SECOND)], a=[$0]) +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 0:INTERVAL MILLISECOND)]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]]) ]]> diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RelTimeIndicatorConverterTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RelTimeIndicatorConverterTest.xml index 6c0fafe..39d6c48 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RelTimeIndicatorConverterTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RelTimeIndicatorConverterTest.xml @@ -139,9 +139,9 @@ SELECT TUMBLE_END(newrowtime, INTERVAL '30' SECOND), long, sum(`int`) FROM ( <![CDATA[ LogicalProject(EXPR$0=[TUMBLE_END($0)], long=[$1], EXPR$2=[$2]) +- LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)]) - +- LogicalProject($f0=[TUMBLE(TUMBLE_ROWTIME($0), 30000:INTERVAL SECOND)], long=[$1], int=[$2]) + +- LogicalProject($f0=[$TUMBLE(TUMBLE_ROWTIME($0), 30000:INTERVAL SECOND)], long=[$1], int=[$2]) +- LogicalAggregate(group=[{0, 1}], int=[SUM($2)]) - +- LogicalProject($f0=[TUMBLE($0, 10000:INTERVAL SECOND)], long=[$1], int=[$2]) + +- LogicalProject($f0=[$TUMBLE($0, 10000:INTERVAL SECOND)], long=[$1], int=[$2]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]]) ]]> </Resource> @@ -229,7 +229,7 @@ GROUP BY long, TUMBLE(rowtime, INTERVAL '0.1' SECOND) <![CDATA[ LogicalProject(EXPR$0=[$2], long=[$0]) +- LogicalAggregate(group=[{0, 1}], EXPR$0=[MIN($2)]) - +- LogicalProject(long=[$1], $f1=[TUMBLE($0, 100:INTERVAL SECOND)], rowtime=[$0]) + +- LogicalProject(long=[$1], $f1=[$TUMBLE($0, 100:INTERVAL SECOND)], rowtime=[$0]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]]) ]]> </Resource> @@ -279,7 +279,7 @@ FROM MyTable1 <![CDATA[ LogicalProject(EXPR$0=[TUMBLE_END($0)], long=[$1], EXPR$2=[$2]) +- LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)]) - +- LogicalProject($f0=[TUMBLE($0, 10000:INTERVAL SECOND)], long=[$1], int=[$2]) + +- LogicalProject($f0=[$TUMBLE($0, 10000:INTERVAL SECOND)], long=[$1], int=[$2]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]]) ]]> </Resource> @@ -305,7 +305,7 @@ HAVING QUARTER(TUMBLE_END(rowtime, INTERVAL '1' SECOND)) = 1 LogicalProject(EXPR$0=[$2], long=[$0]) +- LogicalFilter(condition=[=(EXTRACT(FLAG(QUARTER), TUMBLE_END($1)), 1)]) +- LogicalAggregate(group=[{0, 1}], EXPR$0=[MIN($2)]) - +- LogicalProject(long=[$1], $f1=[TUMBLE($0, 1000:INTERVAL SECOND)], rowtime=[$0]) + +- LogicalProject(long=[$1], $f1=[$TUMBLE($0, 1000:INTERVAL SECOND)], rowtime=[$0]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]]) ]]> </Resource> @@ -334,7 +334,7 @@ FROM MyTable1 <![CDATA[ LogicalProject(rowtime=[TUMBLE_END($1)], long=[$0], EXPR$2=[$2]) +- LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)]) - +- LogicalProject(long=[$1], $f1=[TUMBLE($0, 100:INTERVAL SECOND)], int=[$2]) + +- LogicalProject(long=[$1], $f1=[$TUMBLE($0, 100:INTERVAL SECOND)], int=[$2]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]]) ]]> </Resource> diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml index 837fa08..209eab3 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml @@ -117,7 +117,7 @@ FROM rowTimeT WHERE val > 100 <![CDATA[ LogicalProject(name=[$0], EXPR$1=[TUMBLE_END($1)], EXPR$2=[$2]) +- LogicalAggregate(group=[{0, 1}], EXPR$2=[AVG($2)]) - +- LogicalProject(name=[$3], $f1=[TUMBLE($1, 600000:INTERVAL MINUTE)], val=[$2]) + +- LogicalProject(name=[$3], $f1=[$TUMBLE($1, 600000:INTERVAL MINUTE)], val=[$2]) +- LogicalFilter(condition=[>($2, 100)]) +- LogicalTableScan(table=[[default_catalog, default_database, rowTimeT]]) ]]> diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml index db86af3..13553e4 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml @@ -34,7 +34,7 @@ FROM MyTable <![CDATA[ LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], EXPR$4=[TUMBLE_START($0)], EXPR$5=[TUMBLE_END($0)]) +- LogicalAggregate(group=[{0}], EXPR$0=[VAR_POP($1)], EXPR$1=[VAR_SAMP($1)], EXPR$2=[STDDEV_POP($1)], EXPR$3=[STDDEV_SAMP($1)]) - +- LogicalProject($f0=[TUMBLE($4, 900000:INTERVAL MINUTE)], c=[$2]) + +- LogicalProject($f0=[$TUMBLE($4, 900000:INTERVAL MINUTE)], c=[$2]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) ]]> </Resource> @@ -48,53 +48,6 @@ Calc(select=[/(-($f0, /(*($f1, $f1), $f2)), $f2) AS EXPR$0, /(-($f0, /(*($f1, $f ]]> </Resource> </TestCase> - <TestCase name="testWindowAggregateWithDifferentWindows"> - <Resource name="sql"> - <![CDATA[ -WITH window_1h AS ( - SELECT 1 - FROM MyTable - GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '1' HOUR) -), - -window_2h AS ( - SELECT 1 - FROM MyTable - GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) -) - -(SELECT * FROM window_1h) -UNION ALL -(SELECT * FROM window_2h) -]]> - </Resource> - <Resource name="planBefore"> - <![CDATA[ -LogicalUnion(all=[true]) -:- LogicalProject(EXPR$0=[1]) -: +- LogicalAggregate(group=[{0}]) -: +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 3600000:INTERVAL HOUR)]) -: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) -+- LogicalProject(EXPR$0=[1]) - +- LogicalAggregate(group=[{0}]) - +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 7200000:INTERVAL HOUR)]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) -]]> - </Resource> - <Resource name="planAfter"> - <![CDATA[ -Union(all=[true], union=[EXPR$0]) -:- Calc(select=[1 AS EXPR$0]) -: +- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 3600000, 3600000)], select=[]) -: +- Exchange(distribution=[single], reuse_id=[1]) -: +- Calc(select=[rowtime]) -: +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) -+- Calc(select=[1 AS EXPR$0]) - +- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 7200000, 3600000)], select=[]) - +- Reused(reference_id=[1]) -]]> - </Resource> - </TestCase> <TestCase name="testExpressionOnWindowAuxFunction"> <Resource name="sql"> <![CDATA[ @@ -108,7 +61,7 @@ FROM MyTable <![CDATA[ LogicalProject(EXPR$0=[$1], EXPR$1=[+(TUMBLE_END($0), 60000:INTERVAL MINUTE)]) +- LogicalAggregate(group=[{0}], EXPR$0=[COUNT()]) - +- LogicalProject($f0=[TUMBLE($4, 900000:INTERVAL MINUTE)]) + +- LogicalProject($f0=[$TUMBLE($4, 900000:INTERVAL MINUTE)]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) ]]> </Resource> @@ -213,7 +166,7 @@ GroupWindowAggregate(window=[SlidingGroupWindow('w$, $f2, 60000, 1000)], select= <![CDATA[ LogicalProject(EXPR$0=[$1]) +- LogicalAggregate(group=[{0}], EXPR$0=[COUNT()]) - +- LogicalProject($f0=[TUMBLE($3, 3024000000:INTERVAL DAY)]) + +- LogicalProject($f0=[$TUMBLE($3, 3024000000:INTERVAL DAY)]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) ]]> </Resource> @@ -243,7 +196,7 @@ LogicalProject(EXPR$0=[$1]) +- LogicalAggregate(group=[{0}], EXPR$0=[weightedAvg($1, $2)]) +- LogicalProject(b=[$1], c=[$2], a=[$0]) +- LogicalAggregate(group=[{0, 1, 2, 3}]) - +- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[TUMBLE($4, 900000:INTERVAL MINUTE)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[$TUMBLE($4, 900000:INTERVAL MINUTE)]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) ]]> </Resource> @@ -380,9 +333,9 @@ GROUP BY TUMBLE(zzzzz, INTERVAL '0.004' SECOND) <![CDATA[ LogicalProject(EXPR$0=[TUMBLE_ROWTIME($0)], EXPR$1=[TUMBLE_END($0)], a=[$1]) +- LogicalAggregate(group=[{0}], a=[COUNT()]) - +- LogicalProject($f0=[TUMBLE(TUMBLE_ROWTIME($0), 4:INTERVAL SECOND)], a=[$1]) + +- LogicalProject($f0=[$TUMBLE(TUMBLE_ROWTIME($0), 4:INTERVAL SECOND)], a=[$1]) +- LogicalAggregate(group=[{0}], a=[COUNT($1)]) - +- LogicalProject($f0=[TUMBLE($4, 2:INTERVAL SECOND)], a=[$0]) + +- LogicalProject($f0=[$TUMBLE($4, 2:INTERVAL SECOND)], a=[$0]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) ]]> </Resource> @@ -416,7 +369,7 @@ LogicalProject(EXPR$0=[$2]) +- LogicalAggregate(group=[{0, 1}], EXPR$0=[weightedAvg($2, $3)]) +- LogicalProject(b=[$1], d=[$4], c=[$2], a=[$0]) +- LogicalAggregate(group=[{0, 1, 2, 3}], d=[COUNT()]) - +- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[TUMBLE($4, 900000:INTERVAL MINUTE)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[$TUMBLE($4, 900000:INTERVAL MINUTE)]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) ]]> </Resource> @@ -453,7 +406,7 @@ GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE) <![CDATA[ LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)]) +- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)]) - +- LogicalProject($f0=[TUMBLE($4, 900000:INTERVAL MINUTE)], correct=[CASE(=($0, 1), 1, 99)]) + +- LogicalProject($f0=[$TUMBLE($4, 900000:INTERVAL MINUTE)], correct=[CASE(=($0, 1), 1, 99)]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) ]]> </Resource> @@ -520,37 +473,32 @@ GroupWindowAggregate(window=[SessionGroupWindow('w$, $f2, 60000)], select=[SUM(a ]]> </Resource> </TestCase> - <TestCase name="testTumbleFunAndRegularAggFunInGroupBy"> + <TestCase name="testTumbleFunction"> <Resource name="sql"> <![CDATA[ -SELECT weightedAvg(c, a) FROM - (SELECT a, b, c, count(*) d, - TUMBLE_START(rowtime, INTERVAL '15' MINUTE) as ping_start - FROM MyTable - GROUP BY a, b, c, TUMBLE(rowtime, INTERVAL '15' MINUTE)) AS t1 -GROUP BY b, d, ping_start +SELECT COUNT(*), + weightedAvg(c, a) AS wAvg, + TUMBLE_START(rowtime, INTERVAL '15' MINUTE), + TUMBLE_END(rowtime, INTERVAL '15' MINUTE) +FROM MyTable + GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE) ]]> </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalProject(EXPR$0=[$3]) -+- LogicalAggregate(group=[{0, 1, 2}], EXPR$0=[weightedAvg($3, $4)]) - +- LogicalProject(b=[$1], d=[$4], ping_start=[TUMBLE_START($3)], c=[$2], a=[$0]) - +- LogicalAggregate(group=[{0, 1, 2, 3}], d=[COUNT()]) - +- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[TUMBLE($4, 900000:INTERVAL MINUTE)]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +LogicalProject(EXPR$0=[$1], wAvg=[$2], EXPR$2=[TUMBLE_START($0)], EXPR$3=[TUMBLE_END($0)]) ++- LogicalAggregate(group=[{0}], EXPR$0=[COUNT()], wAvg=[weightedAvg($1, $2)]) + +- LogicalProject($f0=[$TUMBLE($4, 900000:INTERVAL MINUTE)], c=[$2], a=[$0]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ -Calc(select=[EXPR$0]) -+- GroupAggregate(groupBy=[b, d, ping_start], select=[b, d, ping_start, weightedAvg(c, a) AS EXPR$0]) - +- Exchange(distribution=[hash[b, d, ping_start]]) - +- Calc(select=[b, d, w$start AS ping_start, c, a]) - +- GroupWindowAggregate(groupBy=[a, b, c], window=[TumblingGroupWindow('w$, rowtime, 900000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[a, b, c, COUNT(*) AS d, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) - +- Exchange(distribution=[hash[a, b, c]]) - +- Calc(select=[a, b, c, rowtime]) - +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) +Calc(select=[EXPR$0, wAvg, w$start AS EXPR$2, w$end AS EXPR$3]) ++- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 900000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS EXPR$0, weightedAvg(c, a) AS wAvg, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) + +- Exchange(distribution=[single]) + +- Calc(select=[rowtime, c, a]) + +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) ]]> </Resource> </TestCase> @@ -572,7 +520,7 @@ LogicalProject(EXPR$0=[$2]) +- LogicalAggregate(group=[{0, 1}], EXPR$0=[weightedAvg($2, $3)]) +- LogicalProject(b=[$1], ping_start=[TUMBLE_START($3)], c=[$2], a=[$0]) +- LogicalAggregate(group=[{0, 1, 2, 3}]) - +- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[TUMBLE($4, 900000:INTERVAL MINUTE)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[$TUMBLE($4, 900000:INTERVAL MINUTE)]) +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) ]]> </Resource> @@ -589,94 +537,104 @@ Calc(select=[EXPR$0]) ]]> </Resource> </TestCase> - <TestCase name="testTumbleFunction"> + <TestCase name="testTumblingWindowWithProctime"> <Resource name="sql"> - <![CDATA[ -SELECT COUNT(*), - weightedAvg(c, a) AS wAvg, - TUMBLE_START(rowtime, INTERVAL '15' MINUTE), - TUMBLE_END(rowtime, INTERVAL '15' MINUTE) -FROM MyTable - GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE) - ]]> + <![CDATA[select sum(a), max(b) from MyTable1 group by TUMBLE(c, INTERVAL '1' SECOND)]]> </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalProject(EXPR$0=[$1], wAvg=[$2], EXPR$2=[TUMBLE_START($0)], EXPR$3=[TUMBLE_END($0)]) -+- LogicalAggregate(group=[{0}], EXPR$0=[COUNT()], wAvg=[weightedAvg($1, $2)]) - +- LogicalProject($f0=[TUMBLE($4, 900000:INTERVAL MINUTE)], c=[$2], a=[$0]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +LogicalProject(EXPR$0=[$1], EXPR$1=[$2]) ++- LogicalAggregate(group=[{0}], EXPR$0=[SUM($1)], EXPR$1=[MAX($2)]) + +- LogicalProject($f0=[$TUMBLE(PROCTIME(), 1000:INTERVAL SECOND)], a=[$0], b=[$1]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [CollectionTableSource(a, b)]]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ -Calc(select=[EXPR$0, wAvg, w$start AS EXPR$2, w$end AS EXPR$3]) -+- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 900000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS EXPR$0, weightedAvg(c, a) AS wAvg, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) - +- Exchange(distribution=[single]) - +- Calc(select=[rowtime, c, a]) - +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) +GroupWindowAggregate(window=[TumblingGroupWindow('w$, $f2, 1000)], select=[SUM(a) AS EXPR$0, MAX(b) AS EXPR$1]) ++- Exchange(distribution=[single]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [CollectionTableSource(a, b)]]], fields=[a, b]) ]]> </Resource> </TestCase> - <TestCase name="testTumblingWindowWithProctime"> + <TestCase name="testTumbleFunAndRegularAggFunInGroupBy"> <Resource name="sql"> - <![CDATA[select sum(a), max(b) from MyTable1 group by TUMBLE(c, INTERVAL '1' SECOND)]]> + <![CDATA[ +SELECT weightedAvg(c, a) FROM + (SELECT a, b, c, count(*) d, + TUMBLE_START(rowtime, INTERVAL '15' MINUTE) as ping_start + FROM MyTable + GROUP BY a, b, c, TUMBLE(rowtime, INTERVAL '15' MINUTE)) AS t1 +GROUP BY b, d, ping_start + ]]> </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalProject(EXPR$0=[$1], EXPR$1=[$2]) -+- LogicalAggregate(group=[{0}], EXPR$0=[SUM($1)], EXPR$1=[MAX($2)]) - +- LogicalProject($f0=[TUMBLE(PROCTIME(), 1000:INTERVAL SECOND)], a=[$0], b=[$1]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, source: [CollectionTableSource(a, b)]]]) +LogicalProject(EXPR$0=[$3]) ++- LogicalAggregate(group=[{0, 1, 2}], EXPR$0=[weightedAvg($3, $4)]) + +- LogicalProject(b=[$1], d=[$4], ping_start=[TUMBLE_START($3)], c=[$2], a=[$0]) + +- LogicalAggregate(group=[{0, 1, 2, 3}], d=[COUNT()]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[$TUMBLE($4, 900000:INTERVAL MINUTE)]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ -GroupWindowAggregate(window=[TumblingGroupWindow('w$, $f2, 1000)], select=[SUM(a) AS EXPR$0, MAX(b) AS EXPR$1]) -+- Exchange(distribution=[single]) - +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, source: [CollectionTableSource(a, b)]]], fields=[a, b]) +Calc(select=[EXPR$0]) ++- GroupAggregate(groupBy=[b, d, ping_start], select=[b, d, ping_start, weightedAvg(c, a) AS EXPR$0]) + +- Exchange(distribution=[hash[b, d, ping_start]]) + +- Calc(select=[b, d, w$start AS ping_start, c, a]) + +- GroupWindowAggregate(groupBy=[a, b, c], window=[TumblingGroupWindow('w$, rowtime, 900000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[a, b, c, COUNT(*) AS d, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) + +- Exchange(distribution=[hash[a, b, c]]) + +- Calc(select=[a, b, c, rowtime]) + +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) ]]> </Resource> </TestCase> <TestCase name="testWindowAggregateWithDifferentWindows"> <Resource name="sql"> <![CDATA[ -SELECT - SUM(correct) AS s, - AVG(correct) AS a, - TUMBLE_START(rowtime, INTERVAL '15' MINUTE) AS wStart -FROM ( - SELECT CASE a - WHEN 1 THEN 1 - ELSE 99 - END AS correct, rowtime - FROM MyTable +WITH window_1h AS ( + SELECT 1 + FROM MyTable + GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '1' HOUR) +), + +window_2h AS ( + SELECT 1 + FROM MyTable + GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '2' HOUR) ) -GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE) - ]]> + +(SELECT * FROM window_1h) +UNION ALL +(SELECT * FROM window_2h) +]]> </Resource> <Resource name="planBefore"> <![CDATA[ -LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)]) -+- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)]) - +- LogicalProject($f0=[TUMBLE($4, 900000:INTERVAL MINUTE)], correct=[CASE(=($0, 1), 1, 99)]) - +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) +LogicalUnion(all=[true]) +:- LogicalProject(EXPR$0=[1]) +: +- LogicalAggregate(group=[{0}]) +: +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 3600000:INTERVAL HOUR)]) +: +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) ++- LogicalProject(EXPR$0=[1]) + +- LogicalAggregate(group=[{0}]) + +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 7200000:INTERVAL HOUR)]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) ]]> </Resource> <Resource name="planAfter"> <![CDATA[ Union(all=[true], union=[EXPR$0]) :- Calc(select=[1 AS EXPR$0]) -: +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 3600000)], select=[]) -: +- Exchange(distribution=[single]) -: +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 3600000)], select=[]) -: +- Calc(select=[ts], reuse_id=[1]) -: +- TableSourceScan(table=[[default_catalog, default_database, MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts]) +: +- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 3600000, 3600000)], select=[]) +: +- Exchange(distribution=[single], reuse_id=[1]) +: +- Calc(select=[rowtime]) +: +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) +- Calc(select=[1 AS EXPR$0]) - +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 3600000)], select=[]) - +- Exchange(distribution=[single]) - +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 3600000)], select=[]) - +- Reused(reference_id=[1]) + +- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 7200000, 3600000)], select=[]) + +- Reused(reference_id=[1]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/BasicOperatorTable.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/BasicOperatorTable.scala index 11d9a4b..5ec1fc7 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/BasicOperatorTable.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/BasicOperatorTable.scala @@ -260,6 +260,8 @@ object BasicOperatorTable { */ val TUMBLE: SqlGroupedWindowFunction = new SqlGroupedWindowFunction( + // The TUMBLE group function was hard code to $TUMBLE in CALCITE-3382. + "$TUMBLE", SqlKind.TUMBLE, null, OperandTypes.or(OperandTypes.DATETIME_INTERVAL, OperandTypes.DATETIME_INTERVAL_TIME)) {
