This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f4328b3729aa32c58742dc136e9c994101f8c6ee Author: Sergey Nuyanzin <snuyan...@gmail.com> AuthorDate: Tue Jul 29 17:11:27 2025 +0200 [FLINK-38150][table] Use `RANGE` in plans instead of `RANG` This closes #26840. --- .../table/planner/plan/utils/RelExplainUtil.scala | 2 +- .../planner/plan/batch/sql/DeadlockBreakupTest.xml | 6 +- .../plan/batch/sql/ForwardHashExchangeTest.xml | 8 +-- .../planner/plan/batch/sql/RemoveCollationTest.xml | 2 +- .../planner/plan/batch/sql/RemoveShuffleTest.xml | 10 +-- .../planner/plan/batch/sql/SubplanReuseTest.xml | 6 +- .../batch/sql/agg/AggregateReduceGroupingTest.xml | 2 +- .../plan/batch/sql/agg/OverAggregateTest.xml | 82 +++++++++++----------- .../sql/join/BroadcastHashSemiAntiJoinTest.xml | 6 +- .../batch/sql/join/NestedLoopSemiAntiJoinTest.xml | 6 +- .../plan/batch/sql/join/SemiAntiJoinTest.xml | 6 +- .../sql/join/ShuffledHashSemiAntiJoinTest.xml | 6 +- .../batch/sql/join/SortMergeSemiAntiJoinTest.xml | 6 +- .../batch/table/PythonOverWindowAggregateTest.xml | 2 +- .../testProcTimeBoundedNonPartitionedRangeOver.out | 2 +- .../testProcTimeBoundedPartitionedRangeOver.out | 2 +- .../testProcTimeUnboundedPartitionedRangeOver.out | 2 +- .../PushLocalAggIntoTableSourceScanRuleTest.xml | 2 +- .../planner/plan/stream/sql/MatchRecognizeTest.xml | 2 +- .../planner/plan/stream/sql/SubplanReuseTest.xml | 4 +- .../plan/stream/sql/agg/OverAggregateTest.xml | 22 +++--- .../plan/stream/sql/agg/WindowAggregateTest.xml | 4 +- .../plan/stream/sql/join/SemiAntiJoinTest.xml | 4 +- .../plan/stream/table/OverAggregateTest.xml | 20 +++--- .../stream/table/PythonOverWindowAggregateTest.xml | 4 +- .../planner/plan/stream/table/TableSourceTest.xml | 2 +- .../over-aggregate-unbounded-partitioned-rows.json | 4 +- ...partitioned-rows-with-out-of-order-records.json | 4 +- ...ver-aggregate-bounded-non-partitioned-rows.json | 4 +- ...partitioned-rows-with-out-of-order-records.json | 2 +- .../over-aggregate-bounded-partitioned-rows.json | 2 +- .../plan/over-aggregate-lag.json | 4 +- ...e-non-time-range-unbounded-avg-append-mode.json | 4 +- ...-range-unbounded-multiple-aggs-append-mode.json | 4 +- ...e-non-time-range-unbounded-sum-append-mode.json | 4 +- ...n-time-range-unbounded-sum-no-partition-by.json | 4 +- ...nbounded-sum-retract-mode-sink-primary-key.json | 4 +- ...nge-unbounded-sum-retract-mode-sort-by-key.json | 4 +- ...ounded-sum-retract-mode-source-primary-key.json | 4 +- ...ource-sink-primary-key-partition-by-non-pk.json | 4 +- ...d-sum-retract-mode-source-sink-primary-key.json | 4 +- ...-non-time-range-unbounded-sum-retract-mode.json | 4 +- ...partitioned-rows-with-out-of-order-records.json | 4 +- .../over-aggregate-unbounded-partitioned-rows.json | 2 +- 44 files changed, 143 insertions(+), 143 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala index 621c3070096..970ed8fae50 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala @@ -621,7 +621,7 @@ object RelExplainUtil { } val buf = new StringBuilder - buf.append(if (groupWindow.isRows) " ROWS " else " RANG ") + buf.append(if (groupWindow.isRows) " ROWS " else " RANGE ") val lowerBound = groupWindow.lowerBound val upperBound = groupWindow.upperBound if (lowerBound != null) { diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DeadlockBreakupTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DeadlockBreakupTest.xml index 5d23710c48b..d347bb578cb 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DeadlockBreakupTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DeadlockBreakupTest.xml @@ -539,18 +539,18 @@ LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$3], b0=[$4], c0=[$5]) HashJoin(joinType=[InnerJoin], where=[(c = c0)], select=[a, b, c, a0, b0, c0], build=[left]) :- Exchange(distribution=[hash[c]]) : +- Calc(select=[w0$o0 AS a, b, c]) -: +- OverAggregate(partitionBy=[b], orderBy=[b ASC], window#0=[MAX(a) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[a, b, c, w0$o0]) +: +- OverAggregate(partitionBy=[b], orderBy=[b ASC], window#0=[MAX(a) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[a, b, c, w0$o0]) : +- Exchange(distribution=[forward]) : +- Calc(select=[CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS a, b, c])(reuse_id=[1]) : +- Exchange(distribution=[forward]) -: +- OverAggregate(partitionBy=[b], orderBy=[b ASC], window#0=[COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0, w0$o1]) +: +- OverAggregate(partitionBy=[b], orderBy=[b ASC], window#0=[COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0, w0$o1]) : +- Exchange(distribution=[forward]) : +- Sort(orderBy=[b ASC]) : +- Exchange(distribution=[hash[b]]) : +- TableSourceScan(table=[[default_catalog, default_database, x]], fields=[a, b, c]) +- Exchange(distribution=[hash[c]], shuffle_mode=[BATCH]) +- Calc(select=[w0$o0 AS a, b, c]) - +- OverAggregate(partitionBy=[b], orderBy=[b ASC], window#0=[MIN(a) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[a, b, c, w0$o0]) + +- OverAggregate(partitionBy=[b], orderBy=[b ASC], window#0=[MIN(a) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[a, b, c, w0$o0]) +- Exchange(distribution=[forward]) +- Reused(reference_id=[1]) ]]> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.xml index d3094ef8e6a..c623d637ad9 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.xml @@ -298,7 +298,7 @@ LogicalProject(b=[$0], EXPR$1=[RANK() OVER (ORDER BY $0 NULLS FIRST)]) </Resource> <Resource name="optimized exec plan"> <![CDATA[ -OverAggregate(orderBy=[b ASC], window#0=[RANK(*) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[b, w0$o0]) +OverAggregate(orderBy=[b ASC], window#0=[RANK(*) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[b, w0$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[b ASC]) +- Exchange(distribution=[forward]) @@ -330,7 +330,7 @@ LogicalProject(sum_b=[$1], avg_b=[/(CASE(>(COUNT($1) OVER (PARTITION BY $0), 0), <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[sum_b, (CASE((w0$o0 > 0), w0$o1, null:BIGINT) / w0$o0) AS avg_b, w1$o0 AS rn, c]) -+- OverAggregate(partitionBy=[c], window#0=[COUNT(sum_b) AS w0$o0, $SUM0(sum_b) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], window#1=[RANK(*) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[c, sum_b, w0$o0, w0$o1, w1$o0]) ++- OverAggregate(partitionBy=[c], window#0=[COUNT(sum_b) AS w0$o0, $SUM0(sum_b) AS w0$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], window#1=[RANK(*) AS w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[c, sum_b, w0$o0, w0$o1, w1$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[c ASC]) +- Exchange(distribution=[keep_input_as_is[hash[c]]]) @@ -355,7 +355,7 @@ LogicalProject(b=[$0], EXPR$1=[RANK() OVER (ORDER BY $0 NULLS FIRST)]) </Resource> <Resource name="optimized exec plan"> <![CDATA[ -OverAggregate(orderBy=[b ASC], window#0=[RANK(*) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[b, w0$o0]) +OverAggregate(orderBy=[b ASC], window#0=[RANK(*) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[b, w0$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[b ASC]) +- Exchange(distribution=[forward]) @@ -387,7 +387,7 @@ LogicalProject(sum_b=[$1], avg_b=[/(CASE(>(COUNT($1) OVER (PARTITION BY $0), 0), <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[sum_b, (CASE((w0$o0 > 0), w0$o1, null:BIGINT) / w0$o0) AS avg_b, w1$o0 AS rn, c]) -+- OverAggregate(partitionBy=[c], window#0=[COUNT(sum_b) AS w0$o0, $SUM0(sum_b) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], window#1=[RANK(*) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[c, sum_b, w0$o0, w0$o1, w1$o0]) ++- OverAggregate(partitionBy=[c], window#0=[COUNT(sum_b) AS w0$o0, $SUM0(sum_b) AS w0$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], window#1=[RANK(*) AS w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[c, sum_b, w0$o0, w0$o1, w1$o0]) +- Exchange(distribution=[forward]) +- SortAggregate(isMerge=[true], groupBy=[c], select=[c, Final_SUM(sum$0) AS sum_b]) +- Exchange(distribution=[forward]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml index 958420904ce..baac5667b37 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml @@ -503,7 +503,7 @@ LogicalProject(sum_b=[$1], avg_b=[/(CASE(>(COUNT($1) OVER (PARTITION BY $0 ORDER <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[sum_b, (CASE((w0$o0 > 0), w0$o1, null:BIGINT) / w0$o0) AS avg_b, w0$o2 AS rn]) -+- OverAggregate(partitionBy=[a], orderBy=[a ASC], window#0=[COUNT(sum_b) AS w0$o0, $SUM0(sum_b) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], window#1=[RANK(*) AS w0$o2 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, sum_b, w0$o0, w0$o1, w0$o2]) ++- OverAggregate(partitionBy=[a], orderBy=[a ASC], window#0=[COUNT(sum_b) AS w0$o0, $SUM0(sum_b) AS w0$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], window#1=[RANK(*) AS w0$o2 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, sum_b, w0$o0, w0$o1, w0$o2]) +- Exchange(distribution=[forward]) +- SortAggregate(isMerge=[true], groupBy=[a], select=[a, Final_SUM(sum$0) AS sum_b]) +- Exchange(distribution=[forward]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveShuffleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveShuffleTest.xml index 97a174f1905..2fc7095c9c4 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveShuffleTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveShuffleTest.xml @@ -561,11 +561,11 @@ LogicalProject(sum_b=[$2], avg_b=[/(CASE(>(COUNT($2) OVER (PARTITION BY $0, $1), <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[sum_b, (CASE((w0$o0 > 0), w0$o1, null:BIGINT) / w0$o0) AS avg_b, w1$o0 AS rn, c]) -+- OverAggregate(partitionBy=[c], orderBy=[a ASC, c ASC], window#0=[RANK(*) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, sum_b, w0$o0, w0$o1, w1$o0]) ++- OverAggregate(partitionBy=[c], orderBy=[a ASC, c ASC], window#0=[RANK(*) AS w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, sum_b, w0$o0, w0$o1, w1$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[c ASC, a ASC]) +- Exchange(distribution=[hash[c]]) - +- OverAggregate(partitionBy=[a, c], window#0=[COUNT(sum_b) AS w0$o0, $SUM0(sum_b) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[a, c, sum_b, w0$o0, w0$o1]) + +- OverAggregate(partitionBy=[a, c], window#0=[COUNT(sum_b) AS w0$o0, $SUM0(sum_b) AS w0$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[a, c, sum_b, w0$o0, w0$o1]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[a ASC, c ASC]) +- Exchange(distribution=[keep_input_as_is[hash[a, c]]]) @@ -639,7 +639,7 @@ LogicalProject(sum_b=[$1], avg_b=[/(CASE(>(COUNT($1) OVER (PARTITION BY $0), 0), <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[sum_b, (CASE((w0$o0 > 0), w0$o1, null:BIGINT) / w0$o0) AS avg_b, w1$o0 AS rn, c]) -+- OverAggregate(partitionBy=[c], window#0=[COUNT(sum_b) AS w0$o0, $SUM0(sum_b) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], window#1=[RANK(*) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[c, sum_b, w0$o0, w0$o1, w1$o0]) ++- OverAggregate(partitionBy=[c], window#0=[COUNT(sum_b) AS w0$o0, $SUM0(sum_b) AS w0$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], window#1=[RANK(*) AS w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[c, sum_b, w0$o0, w0$o1, w1$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[c ASC]) +- Exchange(distribution=[keep_input_as_is[hash[c]]]) @@ -674,7 +674,7 @@ LogicalProject(sum_b=[$2], avg_b=[/(CASE(>(COUNT($2) OVER (PARTITION BY $1), 0), <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[sum_b, (CASE((w0$o0 > 0), w0$o1, null:BIGINT) / w0$o0) AS avg_b, w1$o0 AS rn, c]) -+- OverAggregate(partitionBy=[c], window#0=[COUNT(sum_b) AS w0$o0, $SUM0(sum_b) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], window#1=[RANK(*) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[c, sum_b, w0$o0, w0$o1, w1$o0]) ++- OverAggregate(partitionBy=[c], window#0=[COUNT(sum_b) AS w0$o0, $SUM0(sum_b) AS w0$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], window#1=[RANK(*) AS w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[c, sum_b, w0$o0, w0$o1, w1$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[c ASC]) +- Exchange(distribution=[hash[c]]) @@ -1340,7 +1340,7 @@ LogicalProject(sum_b=[$0], avg_b=[/(CASE(>(COUNT($0) OVER (), 0), $SUM0($0) OVER <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[sum_b, (CASE((w0$o0 > 0), w0$o1, null:BIGINT) / w0$o0) AS avg_b]) -+- OverAggregate(window#0=[COUNT(sum_b) AS w0$o0, $SUM0(sum_b) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[sum_b, w0$o0, w0$o1]) ++- OverAggregate(window#0=[COUNT(sum_b) AS w0$o0, $SUM0(sum_b) AS w0$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[sum_b, w0$o0, w0$o1]) +- HashAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS sum_b]) +- Exchange(distribution=[single]) +- LocalHashAggregate(select=[Partial_SUM(b) AS sum$0]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.xml index 281a28978e7..4fb095acffc 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.xml @@ -344,7 +344,7 @@ Calc(select=[c, e, avg_b, sum_b, sum_b0 AS psum, sum_b1 AS nsum, avg_b0 AS avg_b +- MultipleInput(readOrder=[2,0,1], members=[\nHashJoin(joinType=[InnerJoin], where=[((c = c0) AND (e = e0) AND (rn = $f5))], select=[sum_b, avg_b, rn, c, e, sum_b0, avg_b0, sum_b1, c0, e0, $f5], build=[left])\n:- Calc(select=[sum_b, avg_b, rn, c, e, sum_b0, avg_b0])\n: +- HashJoin(joinType=[InnerJoin], where=[((c = c0) AND (e = e0) AND (rn = $f5))], select=[sum_b, avg_b, rn, c, e, sum_b0, avg_b0, c0, e0, $f5], build=[left])\n: :- [#2] Exchange(distribution=[hash[c, e, rn]])\n: [...] :- Exchange(distribution=[hash[c, e, $f5]]) : +- Calc(select=[sum_b, c, e, (w1$o0 - 1) AS $f5], where=[(c <> '')]) - : +- OverAggregate(partitionBy=[c, e], window#0=[COUNT(sum_b) AS w0$o0, $SUM0(sum_b) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], window#1=[RANK(*) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[c, e, sum_b, w0$o0, w0$o1, w1$o0])(reuse_id=[1]) + : +- OverAggregate(partitionBy=[c, e], window#0=[COUNT(sum_b) AS w0$o0, $SUM0(sum_b) AS w0$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], window#1=[RANK(*) AS w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[c, e, sum_b, w0$o0, w0$o1, w1$o0])(reuse_id=[1]) : +- Exchange(distribution=[forward]) : +- Sort(orderBy=[c ASC, e ASC]) : +- Exchange(distribution=[keep_input_as_is[hash[c, e]]]) @@ -1067,7 +1067,7 @@ LogicalProject(a=[$0], b=[$1], EXPR$2=[$2], a0=[$3], b0=[$4], EXPR$20=[$5]) HashJoin(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, w0$o0, a0, b0, w0$o00], build=[left]) :- Exchange(distribution=[hash[a]]) : +- Calc(select=[a, b, w0$o0], where=[(b < 100)]) -: +- OverAggregate(partitionBy=[c], orderBy=[c DESC], window#0=[MyFirst(c) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0])(reuse_id=[1]) +: +- OverAggregate(partitionBy=[c], orderBy=[c DESC], window#0=[MyFirst(c) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0])(reuse_id=[1]) : +- Exchange(distribution=[forward]) : +- Sort(orderBy=[c DESC]) : +- Exchange(distribution=[hash[c]]) @@ -1176,7 +1176,7 @@ LogicalProject(a=[$0], b=[$1], EXPR$2=[$2], a0=[$3], b0=[$4], EXPR$20=[$5]) HashJoin(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, w0$o0, a0, b0, w0$o00], build=[left]) :- Exchange(distribution=[hash[a]]) : +- Calc(select=[a, b, w0$o0], where=[(b < 100)]) -: +- OverAggregate(orderBy=[c DESC], window#0=[RANK(*) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0])(reuse_id=[1]) +: +- OverAggregate(orderBy=[c DESC], window#0=[RANK(*) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0])(reuse_id=[1]) : +- Sort(orderBy=[c DESC]) : +- Exchange(distribution=[single]) : +- TableSourceScan(table=[[default_catalog, default_database, x]], fields=[a, b, c]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml index 855646fe424..8f583b68198 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml @@ -273,7 +273,7 @@ Calc(select=[a1, b1, c, EXPR$3]) +- Exchange(distribution=[hash[a1, c]]) +- LocalHashAggregate(groupBy=[a1, c], auxGrouping=[b1], select=[a1, c, b1, Partial_COUNT(d1) AS count$0]) +- Calc(select=[a1, b1, w0$o0 AS c, d1]) - +- OverAggregate(partitionBy=[c1], window#0=[COUNT(*) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[a1, b1, c1, d1, w0$o0]) + +- OverAggregate(partitionBy=[c1], window#0=[COUNT(*) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[a1, b1, c1, d1, w0$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[c1 ASC]) +- Exchange(distribution=[hash[c1]]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml index d75657f64e5..b9329e59478 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml @@ -37,15 +37,15 @@ LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS EXPR$0, w1$o0 AS EXPR$1, CAST((CASE((w2$o0 > 0), w2$o1, null:INTEGER) / w2$o0) AS INTEGER) AS EXPR$2, w0$o2 AS EXPR$3, w2$o2 AS EXPR$4]) -+- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(a) AS w2$o0, $SUM0(a) AS w2$o1, MIN(a) AS w2$o2 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0, w0$o1, w0$o2, w1$o0, w2$o0, w2$o1, w2$o2]) ++- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(a) AS w2$o0, $SUM0(a) AS w2$o1, MIN(a) AS w2$o2 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0, w0$o1, w0$o2, w1$o0, w2$o0, w2$o1, w2$o2]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[c ASC, a ASC]) +- Exchange(distribution=[hash[c]]) - +- OverAggregate(partitionBy=[b], orderBy=[c ASC], window#0=[MAX(a) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0, w0$o1, w0$o2, w1$o0]) + +- OverAggregate(partitionBy=[b], orderBy=[c ASC], window#0=[MAX(a) AS w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0, w0$o1, w0$o2, w1$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[b ASC, c ASC]) +- Exchange(distribution=[keep_input_as_is[hash[b]]]) - +- OverAggregate(partitionBy=[b], orderBy=[a ASC], window#0=[COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], window#1=[RANK(*) AS w0$o2 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0, w0$o1, w0$o2]) + +- OverAggregate(partitionBy=[b], orderBy=[a ASC], window#0=[COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], window#1=[RANK(*) AS w0$o2 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0, w0$o1, w0$o2]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[b ASC, a ASC]) +- Exchange(distribution=[hash[b]]) @@ -74,19 +74,19 @@ LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS EXPR$0, w1$o0 AS EXPR$1, w2$o0 AS EXPR$2, w0$o2 AS EXPR$3, CAST((CASE((w3$o0 > 0), w3$o1, null:INTEGER) / w3$o0) AS INTEGER) AS EXPR$4]) -+- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[MAX(a) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0, w0$o1, w3$o0, w3$o1, w2$o0, w0$o0]) ++- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[MAX(a) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0, w0$o1, w3$o0, w3$o1, w2$o0, w0$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[c ASC, a ASC]) +- Exchange(distribution=[hash[c]]) - +- OverAggregate(partitionBy=[b], orderBy=[c ASC], window#0=[COUNT(a) AS w3$o0, $SUM0(a) AS w3$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], window#1=[RANK(*) AS w2$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0, w0$o1, w3$o0, w3$o1, w2$o0]) + +- OverAggregate(partitionBy=[b], orderBy=[c ASC], window#0=[COUNT(a) AS w3$o0, $SUM0(a) AS w3$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], window#1=[RANK(*) AS w2$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0, w0$o1, w3$o0, w3$o1, w2$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[b ASC, c ASC]) +- Exchange(distribution=[hash[b]]) - +- OverAggregate(orderBy=[c ASC, a ASC], window#0=[MIN(a) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0, w0$o1]) + +- OverAggregate(orderBy=[c ASC, a ASC], window#0=[MIN(a) AS w0$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0, w0$o1]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[c ASC, a ASC]) +- Exchange(distribution=[forward]) - +- OverAggregate(orderBy=[b ASC], window#0=[COUNT(a) AS w0$o2, $SUM0(a) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0]) + +- OverAggregate(orderBy=[b ASC], window#0=[COUNT(a) AS w0$o2, $SUM0(a) AS w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0]) +- Sort(orderBy=[b ASC]) +- Exchange(distribution=[single]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c]) @@ -111,11 +111,11 @@ LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS EXPR$0, w1$o0 AS EXPR$1]) -+- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[MAX(a) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0, w0$o1, w1$o0]) ++- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[MAX(a) AS w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0, w0$o1, w1$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[c ASC, a ASC]) +- Exchange(distribution=[hash[c]]) - +- OverAggregate(partitionBy=[b], orderBy=[a ASC], window#0=[COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0, w0$o1]) + +- OverAggregate(partitionBy=[b], orderBy=[a ASC], window#0=[COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0, w0$o1]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[b ASC, a ASC]) +- Exchange(distribution=[hash[b]]) @@ -146,7 +146,7 @@ LogicalProject(EXPR$0=[COUNT() OVER (PARTITION BY $2 ORDER BY $0 NULLS FIRST RAN Calc(select=[w0$o0 AS EXPR$0, CASE((w1$o0 > 0), w1$o1, null:INTEGER) AS EXPR$1, w2$o0 AS EXPR$2, CASE((w3$o0 > 0), w3$o1, null:INTEGER) AS EXPR$3, w4$o0 AS EXPR$4]) +- OverAggregate(partitionBy=[c], orderBy=[c ASC], window#0=[COUNT(*) AS w4$o0 ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING], select=[a, c, w0$o0, w1$o0, w1$o1, w3$o1, w2$o0, w3$o0, w4$o0]) +- Exchange(distribution=[forward]) - +- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) AS w0$o0 RANG BETWEEN -1 PRECEDING AND 10 FOLLOWING], window#1=[COUNT(a) AS w1$o0, $SUM0(a) AS w1$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], window#2=[COUNT(a) AS w3$o1, $SUM0(a) AS w2$o0 RANG BETWEEN 1 PRECEDING AND 10 FOLLOWING], window#3=[RANK(*) AS w3$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, w0$o0, w1$o0, w1$o1, w3$o1, w2$o0, w3$o0]) + +- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) AS w0$o0 RANGE BETWEEN -1 PRECEDING AND 10 FOLLOWING], window#1=[COUNT(a) AS w1$o0, $SUM0(a) AS w1$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], window#2=[COUNT(a) AS w3$o1, $SUM0(a) AS w2$o0 RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING], window#3=[RANK(*) AS w3$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, w0$o0, w1$o0, w1$o1, w3$o1, w2$o0, w3$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[c ASC, a ASC]) +- Exchange(distribution=[hash[c]]) @@ -250,7 +250,7 @@ LogicalProject(EXPR$0=[COUNT() OVER (PARTITION BY $2 ORDER BY $0 NULLS FIRST RAN <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[w0$o0 AS $0]) -+- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) AS w0$o0 RANG BETWEEN -1 PRECEDING AND 10 FOLLOWING], select=[a, c, w0$o0]) ++- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) AS w0$o0 RANGE BETWEEN -1 PRECEDING AND 10 FOLLOWING], select=[a, c, w0$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[c ASC, a ASC]) +- Exchange(distribution=[hash[c]]) @@ -271,7 +271,7 @@ LogicalProject(c=[$2], EXPR$1=[COUNT() OVER (PARTITION BY $2)]) </Resource> <Resource name="optimized exec plan"> <![CDATA[ -OverAggregate(partitionBy=[c], window#0=[COUNT(*) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[c, w0$o0]) +OverAggregate(partitionBy=[c], window#0=[COUNT(*) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[c, w0$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[c ASC]) +- Exchange(distribution=[hash[c]]) @@ -293,7 +293,7 @@ LogicalProject(a=[$0], EXPR$1=[DENSE_RANK() OVER (PARTITION BY $0 ORDER BY $3 NU <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[a, w0$o0 AS $1]) -+- OverAggregate(partitionBy=[a], orderBy=[proctime ASC], window#0=[DENSE_RANK(*) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, proctime, w0$o0]) ++- OverAggregate(partitionBy=[a], orderBy=[proctime ASC], window#0=[DENSE_RANK(*) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, proctime, w0$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[a ASC, proctime ASC]) +- Exchange(distribution=[hash[a]]) @@ -315,7 +315,7 @@ LogicalProject(a=[$0], EXPR$1=[RANK() OVER (PARTITION BY $0 ORDER BY $3 NULLS FI <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[a, w0$o0 AS $1]) -+- OverAggregate(partitionBy=[a], orderBy=[proctime ASC], window#0=[RANK(*) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, proctime, w0$o0]) ++- OverAggregate(partitionBy=[a], orderBy=[proctime ASC], window#0=[RANK(*) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, proctime, w0$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[a ASC, proctime ASC]) +- Exchange(distribution=[hash[a]]) @@ -337,7 +337,7 @@ LogicalProject(c=[$2], EXPR$1=[CASE(>(COUNT($0) OVER (ORDER BY $1 NULLS FIRST), <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS EXPR$1]) -+- OverAggregate(orderBy=[b ASC], window#0=[COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0, w0$o1]) ++- OverAggregate(orderBy=[b ASC], window#0=[COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0, w0$o1]) +- Sort(orderBy=[b ASC]) +- Exchange(distribution=[single]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c]) @@ -356,7 +356,7 @@ LogicalProject(c=[$2], EXPR$1=[COUNT() OVER ()]) </Resource> <Resource name="optimized exec plan"> <![CDATA[ -OverAggregate(window#0=[COUNT(*) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[c, w0$o0]) +OverAggregate(window#0=[COUNT(*) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[c, w0$o0]) +- Exchange(distribution=[single]) +- Calc(select=[c]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c]) @@ -380,7 +380,7 @@ LogicalProject(EXPR$0=[COUNT() OVER (PARTITION BY $2 ORDER BY $0 NULLS FIRST RAN <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[w0$o0 AS $0]) -+- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) AS w0$o0 RANG BETWEEN -1 PRECEDING AND 10 FOLLOWING], select=[a, c, w0$o0]) ++- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) AS w0$o0 RANGE BETWEEN -1 PRECEDING AND 10 FOLLOWING], select=[a, c, w0$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[c ASC, a ASC]) +- Exchange(distribution=[hash[c]]) @@ -406,7 +406,7 @@ LogicalProject(EXPR$0=[COUNT() OVER (PARTITION BY $2 ORDER BY $0 NULLS FIRST RAN <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[w0$o0 AS $0]) -+- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) AS w0$o0 RANG BETWEEN -1 FOLLOWING AND 10 FOLLOWING], select=[a, c, w0$o0]) ++- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) AS w0$o0 RANGE BETWEEN -1 FOLLOWING AND 10 FOLLOWING], select=[a, c, w0$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[c ASC, a ASC]) +- Exchange(distribution=[hash[c]]) @@ -428,7 +428,7 @@ LogicalProject(EXPR$0=[RANK() OVER (PARTITION BY $2 ORDER BY $0 NULLS FIRST, $2 <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[w0$o0 AS $0]) -+- OverAggregate(partitionBy=[c], orderBy=[a ASC, c ASC], window#0=[RANK(*) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, w0$o0]) ++- OverAggregate(partitionBy=[c], orderBy=[a ASC, c ASC], window#0=[RANK(*) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, w0$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[c ASC, a ASC]) +- Exchange(distribution=[hash[c]]) @@ -449,7 +449,7 @@ LogicalProject(EXPR$0=[COUNT(1) OVER ()]) </Resource> <Resource name="optimized exec plan"> <![CDATA[ -OverAggregate(window#0=[COUNT(1) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[w0$o0]) +OverAggregate(window#0=[COUNT(1) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[w0$o0]) +- Exchange(distribution=[single]) +- Calc(select=[]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c]) @@ -474,7 +474,7 @@ LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $2 ORDER BY $0 NULLS <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS EXPR$0, w0$o2 AS EXPR$1]) -+- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1, COUNT(1) AS w0$o2 RANG BETWEEN -1 FOLLOWING AND 10 FOLLOWING], select=[a, c, w0$o0, w0$o1, w0$o2]) ++- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1, COUNT(1) AS w0$o2 RANGE BETWEEN -1 FOLLOWING AND 10 FOLLOWING], select=[a, c, w0$o0, w0$o1, w0$o2]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[c ASC, a ASC]) +- Exchange(distribution=[hash[c]]) @@ -501,11 +501,11 @@ LogicalProject(EXPR$0=[COUNT(2) OVER (ORDER BY $0 NULLS FIRST)], EXPR$1=[COUNT(1 <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[w0$o0 AS $0, w1$o0 AS $1]) -+- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(1) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, w0$o0, w1$o0]) ++- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(1) AS w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, w0$o0, w1$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[c ASC, a ASC]) +- Exchange(distribution=[hash[c]]) - +- OverAggregate(orderBy=[a ASC], window#0=[COUNT(2) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, w0$o0]) + +- OverAggregate(orderBy=[a ASC], window#0=[COUNT(2) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, w0$o0]) +- Sort(orderBy=[a ASC]) +- Exchange(distribution=[single]) +- Calc(select=[a, c]) @@ -534,13 +534,13 @@ LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS EXPR$0, w1$o0 AS EXPR$1, CAST((CASE((w2$o0 > 0), w2$o1, null:INTEGER) / w2$o0) AS INTEGER) AS EXPR$2, w0$o2 AS EXPR$3, w1$o1 AS EXPR$4]) -+- OverAggregate(partitionBy=[b], orderBy=[c ASC], window#0=[COUNT(a) AS w2$o0, $SUM0(a) AS w2$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], window#1=[RANK(*) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o1, w0$o2, w1$o1, w0$o0, w2$o0, w2$o1, w1$o0]) ++- OverAggregate(partitionBy=[b], orderBy=[c ASC], window#0=[COUNT(a) AS w2$o0, $SUM0(a) AS w2$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], window#1=[RANK(*) AS w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o1, w0$o2, w1$o1, w0$o0, w2$o0, w2$o1, w1$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[b ASC, c ASC]) +- Exchange(distribution=[keep_input_as_is[hash[b]]]) - +- OverAggregate(partitionBy=[b], orderBy=[b ASC], window#0=[MAX(a) AS w1$o1, MIN(a) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o1, w0$o2, w1$o1, w0$o0]) + +- OverAggregate(partitionBy=[b], orderBy=[b ASC], window#0=[MAX(a) AS w1$o1, MIN(a) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o1, w0$o2, w1$o1, w0$o0]) +- Exchange(distribution=[forward]) - +- OverAggregate(partitionBy=[b], orderBy=[a ASC], window#0=[COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o1, w0$o2]) + +- OverAggregate(partitionBy=[b], orderBy=[a ASC], window#0=[COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o1, w0$o2]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[b ASC, a ASC]) +- Exchange(distribution=[hash[b]]) @@ -569,17 +569,17 @@ LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $2 NULLS <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS EXPR$0, w1$o0 AS EXPR$1, CAST((CASE((w2$o0 > 0), w2$o1, null:INTEGER) / w2$o0) AS INTEGER) AS EXPR$2, w3$o0 AS EXPR$3, w4$o0 AS EXPR$4]) -+- OverAggregate(partitionBy=[b], orderBy=[c ASC], window#0=[COUNT(a) AS w1$o0, $SUM0(a) AS w3$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w2$o0, w0$o0, w2$o1, w4$o0, w0$o1, w1$o0, w3$o0]) ++- OverAggregate(partitionBy=[b], orderBy=[c ASC], window#0=[COUNT(a) AS w1$o0, $SUM0(a) AS w3$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w2$o0, w0$o0, w2$o1, w4$o0, w0$o1, w1$o0, w3$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[b ASC, c ASC]) +- Exchange(distribution=[keep_input_as_is[hash[b]]]) - +- OverAggregate(partitionBy=[b], orderBy=[b ASC], window#0=[MIN(a) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w2$o0, w0$o0, w2$o1, w4$o0, w0$o1]) + +- OverAggregate(partitionBy=[b], orderBy=[b ASC], window#0=[MIN(a) AS w0$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w2$o0, w0$o0, w2$o1, w4$o0, w0$o1]) +- Exchange(distribution=[forward]) - +- OverAggregate(partitionBy=[b], orderBy=[a ASC, c ASC], window#0=[COUNT(a) AS w2$o1, $SUM0(a) AS w4$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w2$o0, w0$o0, w2$o1, w4$o0]) + +- OverAggregate(partitionBy=[b], orderBy=[a ASC, c ASC], window#0=[COUNT(a) AS w2$o1, $SUM0(a) AS w4$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w2$o0, w0$o0, w2$o1, w4$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[b ASC, a ASC, c ASC]) +- Exchange(distribution=[keep_input_as_is[hash[b]]]) - +- OverAggregate(partitionBy=[b], orderBy=[a ASC], window#0=[MAX(a) AS w2$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], window#1=[RANK(*) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w2$o0, w0$o0]) + +- OverAggregate(partitionBy=[b], orderBy=[a ASC], window#0=[MAX(a) AS w2$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], window#1=[RANK(*) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w2$o0, w0$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[b ASC, a ASC]) +- Exchange(distribution=[hash[b]]) @@ -605,7 +605,7 @@ LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS EXPR$0, w0$o2 AS EXPR$1]) -+- OverAggregate(partitionBy=[b], orderBy=[a ASC], window#0=[COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1, MAX(a) AS w0$o2 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, w0$o0, w0$o1, w0$o2]) ++- OverAggregate(partitionBy=[b], orderBy=[a ASC], window#0=[COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1, MAX(a) AS w0$o2 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, w0$o0, w0$o1, w0$o2]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[b ASC, a ASC]) +- Exchange(distribution=[hash[b]]) @@ -635,11 +635,11 @@ LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS EXPR$0, w0$o2 AS EXPR$1, CAST((CASE((w1$o0 > 0), w1$o1, null:INTEGER) / w1$o0) AS INTEGER) AS EXPR$2, w0$o3 AS EXPR$3, w1$o2 AS EXPR$4]) -+- OverAggregate(partitionBy=[b], orderBy=[a DESC], window#0=[COUNT(a) AS w1$o0, $SUM0(a) AS w1$o1, MIN(a) AS w1$o2 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, w0$o0, w0$o1, w0$o2, w0$o3, w1$o0, w1$o1, w1$o2]) ++- OverAggregate(partitionBy=[b], orderBy=[a DESC], window#0=[COUNT(a) AS w1$o0, $SUM0(a) AS w1$o1, MIN(a) AS w1$o2 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, w0$o0, w0$o1, w0$o2, w0$o3, w1$o0, w1$o1, w1$o2]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[b ASC, a DESC]) +- Exchange(distribution=[keep_input_as_is[hash[b]]]) - +- OverAggregate(partitionBy=[b], orderBy=[a ASC], window#0=[COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1, MAX(a) AS w0$o2 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], window#1=[RANK(*) AS w0$o3 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, w0$o0, w0$o1, w0$o2, w0$o3]) + +- OverAggregate(partitionBy=[b], orderBy=[a ASC], window#0=[COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1, MAX(a) AS w0$o2 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], window#1=[RANK(*) AS w0$o3 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, w0$o0, w0$o1, w0$o2, w0$o3]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[b ASC, a ASC]) +- Exchange(distribution=[hash[b]]) @@ -666,11 +666,11 @@ LogicalProject(EXPR$0=[RANK() OVER (PARTITION BY $1 ORDER BY $0 DESC NULLS LAST) <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[w0$o0 AS $0, w1$o0 AS $1]) -+- OverAggregate(partitionBy=[b], orderBy=[a DESC], window#0=[RANK(*) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0, w1$o0]) ++- OverAggregate(partitionBy=[b], orderBy=[a DESC], window#0=[RANK(*) AS w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0, w1$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[b ASC, a DESC]) +- Exchange(distribution=[keep_input_as_is[hash[b]]]) - +- OverAggregate(partitionBy=[b], orderBy=[a ASC], window#0=[RANK(*) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0]) + +- OverAggregate(partitionBy=[b], orderBy=[a ASC], window#0=[RANK(*) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[b ASC, a ASC]) +- Exchange(distribution=[hash[b]]) @@ -697,11 +697,11 @@ LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY $1 ORDER BY $0 NULLS <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS EXPR$0, w1$o0 AS EXPR$1, w0$o2 AS EXPR$2]) -+- OverAggregate(partitionBy=[b], orderBy=[a ASC], window#0=[COUNT(a) AS w1$o0, $SUM0(a) AS w0$o0, MAX(a) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, w0$o2, w1$o0, w0$o0, w0$o1]) ++- OverAggregate(partitionBy=[b], orderBy=[a ASC], window#0=[COUNT(a) AS w1$o0, $SUM0(a) AS w0$o0, MAX(a) AS w0$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, w0$o2, w1$o0, w0$o0, w0$o1]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[b ASC, a ASC]) +- Exchange(distribution=[keep_input_as_is[hash[b]]]) - +- OverAggregate(partitionBy=[b], window#0=[MIN(a) AS w0$o2 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[a, b, w0$o2]) + +- OverAggregate(partitionBy=[b], window#0=[MIN(a) AS w0$o2 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[a, b, w0$o2]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[b ASC]) +- Exchange(distribution=[hash[b]]) @@ -728,9 +728,9 @@ LogicalProject(a=[$0], EXPR$1=[RANK() OVER (PARTITION BY $1 ORDER BY $2 NULLS FI <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[a, w0$o0 AS $1, w1$o0 AS $2]) -+- OverAggregate(partitionBy=[b], orderBy=[c ASC, b ASC], window#0=[RANK(*) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0, w1$o0]) ++- OverAggregate(partitionBy=[b], orderBy=[c ASC, b ASC], window#0=[RANK(*) AS w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0, w1$o0]) +- Exchange(distribution=[forward]) - +- OverAggregate(partitionBy=[b], orderBy=[c ASC, a DESC], window#0=[RANK(*) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0]) + +- OverAggregate(partitionBy=[b], orderBy=[c ASC, a DESC], window#0=[RANK(*) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[b ASC, c ASC, a DESC]) +- Exchange(distribution=[hash[b]]) @@ -763,11 +763,11 @@ LogicalProject(a=[$0], b=[$1], ts=[$2], c1=[$3], c2=[$4]) </Resource> <Resource name="optimized exec plan"> <![CDATA[ -OverAggregate(partitionBy=[a], orderBy=[ts ASC], window#0=[COUNT(*) AS w0$o0_0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, ts, w0$o0, w0$o0_0]) +OverAggregate(partitionBy=[a], orderBy=[ts ASC], window#0=[COUNT(*) AS w0$o0_0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, ts, w0$o0, w0$o0_0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[a ASC, ts ASC]) +- Exchange(distribution=[hash[a]]) - +- OverAggregate(partitionBy=[a, b], orderBy=[ts ASC], window#0=[COUNT(*) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, ts, w0$o0]) + +- OverAggregate(partitionBy=[a, b], orderBy=[ts ASC], window#0=[COUNT(*) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, ts, w0$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[a ASC, b ASC, ts ASC]) +- Exchange(distribution=[hash[a, b]]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.xml index 0f8b17670d4..f613399ae95 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.xml @@ -735,7 +735,7 @@ HashJoin(joinType=[LeftSemiJoin], where=[((b = rk) AND (a <> d))], select=[a, b, :- TableSourceScan(table=[[default_catalog, default_database, l]], fields=[a, b, c]) +- Exchange(distribution=[broadcast]) +- Calc(select=[w0$o0 AS rk, d]) - +- OverAggregate(partitionBy=[d], orderBy=[e ASC], window#0=[RANK(*) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, w0$o0]) + +- OverAggregate(partitionBy=[d], orderBy=[e ASC], window#0=[RANK(*) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, w0$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[d ASC, e ASC]) +- Exchange(distribution=[hash[d]]) @@ -1167,11 +1167,11 @@ HashJoin(joinType=[LeftSemiJoin], where=[((a = $0) AND (b = $1))], select=[a, b, :- TableSourceScan(table=[[default_catalog, default_database, l]], fields=[a, b, c]) +- Exchange(distribution=[broadcast]) +- Calc(select=[w0$o0 AS $0, w1$o0 AS $1]) - +- OverAggregate(partitionBy=[f], orderBy=[d ASC], window#0=[MIN(e) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, f, w0$o0, w1$o0]) + +- OverAggregate(partitionBy=[f], orderBy=[d ASC], window#0=[MIN(e) AS w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, f, w0$o0, w1$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[f ASC, d ASC]) +- Exchange(distribution=[hash[f]]) - +- OverAggregate(window#0=[MAX(d) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[d, e, f, w0$o0]) + +- OverAggregate(window#0=[MAX(d) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[d, e, f, w0$o0]) +- Exchange(distribution=[single]) +- TableSourceScan(table=[[default_catalog, default_database, r]], fields=[d, e, f]) ]]> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml index 094c0b1f4f2..fed6b988a93 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml @@ -917,7 +917,7 @@ NestedLoopJoin(joinType=[LeftSemiJoin], where=[((b = rk) AND (a <> d))], select= :- TableSourceScan(table=[[default_catalog, default_database, l]], fields=[a, b, c]) +- Exchange(distribution=[broadcast]) +- Calc(select=[w0$o0 AS rk, d]) - +- OverAggregate(partitionBy=[d], orderBy=[e ASC], window#0=[RANK(*) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, w0$o0]) + +- OverAggregate(partitionBy=[d], orderBy=[e ASC], window#0=[RANK(*) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, w0$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[d ASC, e ASC]) +- Exchange(distribution=[hash[d]]) @@ -1434,11 +1434,11 @@ NestedLoopJoin(joinType=[LeftSemiJoin], where=[((a = $0) AND (b = $1))], select= :- TableSourceScan(table=[[default_catalog, default_database, l]], fields=[a, b, c]) +- Exchange(distribution=[broadcast]) +- Calc(select=[w0$o0 AS $0, w1$o0 AS $1]) - +- OverAggregate(partitionBy=[f], orderBy=[d ASC], window#0=[MIN(e) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, f, w0$o0, w1$o0]) + +- OverAggregate(partitionBy=[f], orderBy=[d ASC], window#0=[MIN(e) AS w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, f, w0$o0, w1$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[f ASC, d ASC]) +- Exchange(distribution=[hash[f]]) - +- OverAggregate(window#0=[MAX(d) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[d, e, f, w0$o0]) + +- OverAggregate(window#0=[MAX(d) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[d, e, f, w0$o0]) +- Exchange(distribution=[single]) +- TableSourceScan(table=[[default_catalog, default_database, r]], fields=[d, e, f]) ]]> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SemiAntiJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SemiAntiJoinTest.xml index ec20c141adf..a975f2bd917 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SemiAntiJoinTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SemiAntiJoinTest.xml @@ -950,7 +950,7 @@ HashJoin(joinType=[LeftSemiJoin], where=[((b = rk) AND (a <> d))], select=[a, b, : +- TableSourceScan(table=[[default_catalog, default_database, l]], fields=[a, b, c]) +- Exchange(distribution=[hash[rk]]) +- Calc(select=[w0$o0 AS rk, d]) - +- OverAggregate(partitionBy=[d], orderBy=[e ASC], window#0=[RANK(*) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, w0$o0]) + +- OverAggregate(partitionBy=[d], orderBy=[e ASC], window#0=[RANK(*) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, w0$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[d ASC, e ASC]) +- Exchange(distribution=[hash[d]]) @@ -1488,11 +1488,11 @@ HashJoin(joinType=[LeftSemiJoin], where=[((a = $0) AND (b = $1))], select=[a, b, : +- TableSourceScan(table=[[default_catalog, default_database, l]], fields=[a, b, c]) +- Exchange(distribution=[hash[$0, $1]]) +- Calc(select=[w0$o0 AS $0, w1$o0 AS $1]) - +- OverAggregate(partitionBy=[f], orderBy=[d ASC], window#0=[MIN(e) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, f, w0$o0, w1$o0]) + +- OverAggregate(partitionBy=[f], orderBy=[d ASC], window#0=[MIN(e) AS w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, f, w0$o0, w1$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[f ASC, d ASC]) +- Exchange(distribution=[hash[f]]) - +- OverAggregate(window#0=[MAX(d) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[d, e, f, w0$o0]) + +- OverAggregate(window#0=[MAX(d) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[d, e, f, w0$o0]) +- Exchange(distribution=[single]) +- TableSourceScan(table=[[default_catalog, default_database, r]], fields=[d, e, f]) ]]> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.xml index 8f29cd63d08..00927a6bb00 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.xml @@ -800,7 +800,7 @@ HashJoin(joinType=[LeftSemiJoin], where=[((b = rk) AND (a <> d))], select=[a, b, : +- TableSourceScan(table=[[default_catalog, default_database, l]], fields=[a, b, c]) +- Exchange(distribution=[hash[rk]]) +- Calc(select=[w0$o0 AS rk, d]) - +- OverAggregate(partitionBy=[d], orderBy=[e ASC], window#0=[RANK(*) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, w0$o0]) + +- OverAggregate(partitionBy=[d], orderBy=[e ASC], window#0=[RANK(*) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, w0$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[d ASC, e ASC]) +- Exchange(distribution=[hash[d]]) @@ -1291,11 +1291,11 @@ HashJoin(joinType=[LeftSemiJoin], where=[((a = $0) AND (b = $1))], select=[a, b, : +- TableSourceScan(table=[[default_catalog, default_database, l]], fields=[a, b, c]) +- Exchange(distribution=[hash[$0, $1]]) +- Calc(select=[w0$o0 AS $0, w1$o0 AS $1]) - +- OverAggregate(partitionBy=[f], orderBy=[d ASC], window#0=[MIN(e) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, f, w0$o0, w1$o0]) + +- OverAggregate(partitionBy=[f], orderBy=[d ASC], window#0=[MIN(e) AS w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, f, w0$o0, w1$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[f ASC, d ASC]) +- Exchange(distribution=[hash[f]]) - +- OverAggregate(window#0=[MAX(d) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[d, e, f, w0$o0]) + +- OverAggregate(window#0=[MAX(d) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[d, e, f, w0$o0]) +- Exchange(distribution=[single]) +- TableSourceScan(table=[[default_catalog, default_database, r]], fields=[d, e, f]) ]]> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeSemiAntiJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeSemiAntiJoinTest.xml index fd85c5db67a..17b7e1066d9 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeSemiAntiJoinTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeSemiAntiJoinTest.xml @@ -800,7 +800,7 @@ SortMergeJoin(joinType=[LeftSemiJoin], where=[((b = rk) AND (a <> d))], select=[ : +- TableSourceScan(table=[[default_catalog, default_database, l]], fields=[a, b, c]) +- Exchange(distribution=[hash[rk]]) +- Calc(select=[w0$o0 AS rk, d]) - +- OverAggregate(partitionBy=[d], orderBy=[e ASC], window#0=[RANK(*) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, w0$o0]) + +- OverAggregate(partitionBy=[d], orderBy=[e ASC], window#0=[RANK(*) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, w0$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[d ASC, e ASC]) +- Exchange(distribution=[hash[d]]) @@ -1291,11 +1291,11 @@ SortMergeJoin(joinType=[LeftSemiJoin], where=[((a = $0) AND (b = $1))], select=[ : +- TableSourceScan(table=[[default_catalog, default_database, l]], fields=[a, b, c]) +- Exchange(distribution=[hash[$0, $1]]) +- Calc(select=[w0$o0 AS $0, w1$o0 AS $1]) - +- OverAggregate(partitionBy=[f], orderBy=[d ASC], window#0=[MIN(e) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, f, w0$o0, w1$o0]) + +- OverAggregate(partitionBy=[f], orderBy=[d ASC], window#0=[MIN(e) AS w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, f, w0$o0, w1$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[f ASC, d ASC]) +- Exchange(distribution=[hash[f]]) - +- OverAggregate(window#0=[MAX(d) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[d, e, f, w0$o0]) + +- OverAggregate(window#0=[MAX(d) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[d, e, f, w0$o0]) +- Exchange(distribution=[single]) +- TableSourceScan(table=[[default_catalog, default_database, r]], fields=[d, e, f]) ]]> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/PythonOverWindowAggregateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/PythonOverWindowAggregateTest.xml index db2295d8a93..ceb66b66085 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/PythonOverWindowAggregateTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/PythonOverWindowAggregateTest.xml @@ -26,7 +26,7 @@ LogicalProject(b=[$1], _c1=[AS(*org.apache.flink.table.planner.runtime.utils.Jav <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[b, w0$o0 AS _c1]) -+- PythonOverAggregate(partitionBy=[b], orderBy=[rowtime ASC], window#0=[PandasAggregateFunction(a, c) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, rowtime, w0$o0]) ++- PythonOverAggregate(partitionBy=[b], orderBy=[rowtime ASC], window#0=[PandasAggregateFunction(a, c) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, rowtime, w0$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[b ASC, rowtime ASC]) +- Exchange(distribution=[hash[b]]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out index bea3a1febca..6993dc3860e 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out @@ -327,7 +327,7 @@ "fieldType" : "BIGINT" } ] }, - "description" : "PythonOverAggregate(orderBy=[proctime ASC], window=[ RANG BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[c, proctime, $2, pyFunc(c, c) AS w0$o0])" + "description" : "PythonOverAggregate(orderBy=[proctime ASC], window=[ RANGE BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[c, proctime, $2, pyFunc(c, c) AS w0$o0])" }, { "id" : 7, "type" : "stream-exec-calc_1", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out index 43e9007f0a4..85933d1df57 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out @@ -341,7 +341,7 @@ "fieldType" : "BIGINT" } ] }, - "description" : "PythonOverAggregate(partitionBy=[a], orderBy=[proctime ASC], window=[ RANG BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[a, c, proctime, $3, pyFunc(c, c) AS w0$o0])" + "description" : "PythonOverAggregate(partitionBy=[a], orderBy=[proctime ASC], window=[ RANGE BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[a, c, proctime, $3, pyFunc(c, c) AS w0$o0])" }, { "id" : 7, "type" : "stream-exec-calc_1", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out index 51e3cd8bdc5..4480abaf70f 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out @@ -331,7 +331,7 @@ "fieldType" : "BIGINT" } ] }, - "description" : "PythonOverAggregate(partitionBy=[a], orderBy=[proctime ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, proctime, $3, pyFunc(c, c) AS w0$o0])" + "description" : "PythonOverAggregate(partitionBy=[a], orderBy=[proctime ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, proctime, $3, pyFunc(c, c) AS w0$o0])" }, { "id" : 7, "type" : "stream-exec-calc_1", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml index 00ec6819914..0c60d04880e 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml @@ -223,7 +223,7 @@ LogicalProject(id=[$0], amount=[$2], EXPR$2=[CASE(>(COUNT($3) OVER (PARTITION BY <Resource name="optimized rel plan"> <![CDATA[ Calc(select=[id, amount, CASE(>(w0$o0, 0), w0$o1, null:BIGINT) AS EXPR$2, name]) -+- OverAggregate(partitionBy=[name], window#0=[COUNT(price) AS w0$o0, $SUM0(price) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[id, name, amount, price, w0$o0, w0$o1]) ++- OverAggregate(partitionBy=[name], window#0=[COUNT(price) AS w0$o0, $SUM0(price) AS w0$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[id, name, amount, price, w0$o0, w0$o1]) +- Sort(orderBy=[name ASC]) +- Exchange(distribution=[hash[name]]) +- TableSourceScan(table=[[default_catalog, default_database, inventory, project=[id, name, amount, price], metadata=[]]], fields=[id, name, amount, price]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MatchRecognizeTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MatchRecognizeTest.xml index 1877061f413..3331876c2bd 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MatchRecognizeTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MatchRecognizeTest.xml @@ -152,7 +152,7 @@ LogicalProject(symbol=[$0], price=[$1], tax=[$2], matchRowtime=[$3], price_sum=[ <Resource name="optimized rel plan"> <![CDATA[ Calc(select=[symbol, price, tax, matchRowtime, CASE(>(w0$o0, 0), w0$o1, null:INTEGER) AS price_sum]), rowType=[RecordType(VARCHAR(2147483647) symbol, INTEGER price, INTEGER tax, TIMESTAMP_LTZ(3) *ROWTIME* matchRowtime, INTEGER price_sum)] -+- OverAggregate(partitionBy=[symbol], orderBy=[matchRowtime ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[symbol, price, tax, matchRowtime, COUNT(price) AS w0$o0, $SUM0(price) AS w0$o1]), rowType=[RecordType(VARCHAR(2147483647) symbol, INTEGER price, INTEGER tax, TIMESTAMP_LTZ(3) *ROWTIME* matchRowtime, BIGINT w0$o0, INTEGER w0$o1)] ++- OverAggregate(partitionBy=[symbol], orderBy=[matchRowtime ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[symbol, price, tax, matchRowtime, COUNT(price) AS w0$o0, $SUM0(price) AS w0$o1]), rowType=[RecordType(VARCHAR(2147483647) symbol, INTEGER price, INTEGER tax, TIMESTAMP_LTZ(3) *ROWTIME* matchRowtime, BIGINT w0$o0, INTEGER w0$o1)] +- Exchange(distribution=[hash[symbol]]), rowType=[RecordType(VARCHAR(2147483647) symbol, INTEGER price, INTEGER tax, TIMESTAMP_LTZ(3) *ROWTIME* matchRowtime)] +- Match(partitionBy=[symbol], orderBy=[ts_ltz ASC], measures=[FINAL(A.price) AS price, FINAL(A.tax) AS tax, FINAL(MATCH_ROWTIME(*.ts_ltz)) AS matchRowtime], rowsPerMatch=[ONE ROW PER MATCH], after=[SKIP TO NEXT ROW], pattern=[_UTF-16LE'A'], define=[{A=>(PREV(A.$2, 0), 0)}]), rowType=[RecordType(VARCHAR(2147483647) symbol, INTEGER price, INTEGER tax, TIMESTAMP_LTZ(3) *ROWTIME* matchRowtime)] +- Exchange(distribution=[hash[symbol]]), rowType=[RecordType(VARCHAR(2147483647) symbol, TIMESTAMP_LTZ(3) *ROWTIME* ts_ltz, INTEGER price, INTEGER tax)] diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SubplanReuseTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SubplanReuseTest.xml index e1f9644f71e..5b5622d341b 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SubplanReuseTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SubplanReuseTest.xml @@ -752,7 +752,7 @@ LogicalProject(a=[$0], b=[$1], EXPR$2=[$2], a0=[$3], b0=[$4], EXPR$20=[$5]) Join(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, w0$o0, a0, b0, w0$o00], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[a]]) : +- Calc(select=[a, b, w0$o0], where=[(b < 100)]) -: +- OverAggregate(orderBy=[c DESC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, RANK(*) AS w0$o0])(reuse_id=[1]) +: +- OverAggregate(orderBy=[c DESC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, RANK(*) AS w0$o0])(reuse_id=[1]) : +- Exchange(distribution=[single]) : +- TableSourceScan(table=[[default_catalog, default_database, x]], fields=[a, b, c]) +- Exchange(distribution=[hash[a]]) @@ -821,7 +821,7 @@ LogicalProject(a=[$0], b=[$1], EXPR$2=[$2], a0=[$3], b0=[$4], EXPR$20=[$5]) Join(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, w0$o0, a0, b0, w0$o00], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[a]]) : +- Calc(select=[a, b, w0$o0], where=[(b < 100)]) -: +- OverAggregate(partitionBy=[c], orderBy=[c DESC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, MyFirst(c) AS w0$o0])(reuse_id=[1]) +: +- OverAggregate(partitionBy=[c], orderBy=[c DESC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, MyFirst(c) AS w0$o0])(reuse_id=[1]) : +- Exchange(distribution=[hash[c]]) : +- TableSourceScan(table=[[default_catalog, default_database, x]], fields=[a, b, c]) +- Exchange(distribution=[hash[a]]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml index 083cb10cf0e..35c13da188e 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml @@ -42,9 +42,9 @@ LogicalProject(a=[$0], b=[$1], ts=[$2], c1=[$3], c2=[$4]) </Resource> <Resource name="optimized exec plan"> <![CDATA[ -OverAggregate(partitionBy=[a], orderBy=[ts ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, ts, w0$o0, COUNT(*) AS w0$o0_0]) +OverAggregate(partitionBy=[a], orderBy=[ts ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, ts, w0$o0, COUNT(*) AS w0$o0_0]) +- Exchange(distribution=[hash[a]]) - +- OverAggregate(partitionBy=[a, b], orderBy=[ts ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, ts, COUNT(*) AS w0$o0]) + +- OverAggregate(partitionBy=[a, b], orderBy=[ts ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, ts, COUNT(*) AS w0$o0]) +- Exchange(distribution=[hash[a, b]]) +- WatermarkAssigner(rowtime=[ts], watermark=[ts]) +- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[a, b, ts]) @@ -127,7 +127,7 @@ LogicalProject(a=[$0], EXPR$1=[COUNT($2) OVER (ORDER BY $3 NULLS FIRST RANGE 100 <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[a, w0$o0 AS $1]) -+- OverAggregate(orderBy=[proctime ASC], window=[ RANG BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(c) AS w0$o0]) ++- OverAggregate(orderBy=[proctime ASC], window=[ RANGE BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(c) AS w0$o0]) +- Exchange(distribution=[single]) +- Calc(select=[a, c, proctime]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) @@ -176,7 +176,7 @@ LogicalProject(a=[$0], avgA=[/(CASE(>(COUNT($2) OVER (PARTITION BY $0 ORDER BY $ <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[a, (CASE((w0$o0 > 0), w0$o1, null:BIGINT) / w0$o0) AS avgA]) -+- OverAggregate(partitionBy=[a], orderBy=[proctime ASC], window=[ RANG BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(c) AS w0$o0, $SUM0(c) AS w0$o1]) ++- OverAggregate(partitionBy=[a], orderBy=[proctime ASC], window=[ RANGE BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(c) AS w0$o0, $SUM0(c) AS w0$o1]) +- Exchange(distribution=[hash[a]]) +- Calc(select=[a, c, proctime]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) @@ -248,7 +248,7 @@ LogicalProject(c=[$2], cnt1=[COUNT($0) OVER (ORDER BY $3 NULLS FIRST)], cnt2=[CA <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, w0$o0 AS cnt1, CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS cnt2]) -+- OverAggregate(orderBy=[proctime ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1]) ++- OverAggregate(orderBy=[proctime ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1]) +- Exchange(distribution=[single]) +- Calc(select=[a, c, proctime]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) @@ -296,7 +296,7 @@ LogicalProject(c=[$2], cnt1=[COUNT($0) OVER (PARTITION BY $2 ORDER BY $3 NULLS F <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, w0$o0 AS cnt1, CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS cnt2]) -+- OverAggregate(partitionBy=[c], orderBy=[proctime ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1]) ++- OverAggregate(partitionBy=[c], orderBy=[proctime ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1]) +- Exchange(distribution=[hash[c]]) +- Calc(select=[a, c, proctime]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) @@ -345,7 +345,7 @@ LogicalProject(c=[$2], cnt1=[COUNT($0) OVER (ORDER BY $4 NULLS FIRST RANGE 1000: <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, w0$o0 AS $1]) -+- OverAggregate(orderBy=[rowtime ASC], window=[ RANG BETWEEN 1000 PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0]) ++- OverAggregate(orderBy=[rowtime ASC], window=[ RANGE BETWEEN 1000 PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0]) +- Exchange(distribution=[single]) +- Calc(select=[a, c, rowtime]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) @@ -394,7 +394,7 @@ LogicalProject(c=[$2], EXPR$1=[COUNT($0) OVER (PARTITION BY $2 ORDER BY $4 NULLS <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, w0$o0 AS $1]) -+- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANG BETWEEN 1000 PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0]) ++- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANGE BETWEEN 1000 PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0]) +- Exchange(distribution=[hash[c]]) +- Calc(select=[a, c, rowtime]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) @@ -444,7 +444,7 @@ LogicalProject(c=[$2], cnt1=[COUNT($0) OVER (ORDER BY $4 NULLS FIRST)], cnt2=[CA <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, w0$o0 AS cnt1, CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS cnt2]) -+- OverAggregate(orderBy=[rowtime ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1]) ++- OverAggregate(orderBy=[rowtime ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1]) +- Exchange(distribution=[single]) +- Calc(select=[a, c, rowtime]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) @@ -494,7 +494,7 @@ LogicalProject(c=[$2], cnt1=[COUNT($0) OVER (PARTITION BY $2 ORDER BY $4 NULLS F <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, w0$o0 AS cnt1, CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS cnt2]) -+- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1]) ++- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1]) +- Exchange(distribution=[hash[c]]) +- Calc(select=[a, c, rowtime]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) @@ -593,7 +593,7 @@ LogicalProject(c=[$2], cnt1=[COUNT($0) OVER (PARTITION BY $2)]) <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, w0$o0 AS $1]) -+- OverAggregate(partitionBy=[c], orderBy=[], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[a, b, c, proctime, rowtime, COUNT(a) AS w0$o0]) ++- OverAggregate(partitionBy=[c], orderBy=[], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[a, b, c, proctime, rowtime, COUNT(a) AS w0$o0]) +- Exchange(distribution=[hash[c]]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) ]]> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml index ca722ace653..b3dbc3a2ca4 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml @@ -2636,7 +2636,7 @@ Calc(select=[c, EXPR$1, EXPR$2]) +- GroupAggregate(groupBy=[window_start, window_end, c, window_time], select=[window_start, window_end, c, window_time, MAX(c1) AS EXPR$1, COUNT(a) AS EXPR$2]) +- Exchange(distribution=[hash[window_start, window_end, c, window_time]]) +- Calc(select=[window_start, window_end, c, PROCTIME_MATERIALIZE(window_time) AS window_time, w0$o0 AS c1, a]) - +- OverAggregate(partitionBy=[c], orderBy=[proctime DESC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, d, e, rowtime, proctime, window_start, window_end, window_time, COUNT(*) AS w0$o0]) + +- OverAggregate(partitionBy=[c], orderBy=[proctime DESC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, d, e, rowtime, proctime, window_start, window_end, window_time, COUNT(*) AS w0$o0]) +- Exchange(distribution=[hash[c]]) +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], max_size=[5 min], step=[10 s])]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) @@ -2676,7 +2676,7 @@ Calc(select=[c, EXPR$1, EXPR$2]) +- GroupAggregate(groupBy=[window_start, window_end, c, window_time], select=[window_start, window_end, c, window_time, MAX(c1) AS EXPR$1, COUNT(a) AS EXPR$2]) +- Exchange(distribution=[hash[window_start, window_end, c, window_time]]) +- Calc(select=[window_start, window_end, c, PROCTIME_MATERIALIZE(window_time) AS window_time, w0$o0 AS c1, a]) - +- OverAggregate(partitionBy=[c], orderBy=[proctime DESC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, d, e, rowtime, proctime, window_start, window_end, window_time, COUNT(*) AS w0$o0]) + +- OverAggregate(partitionBy=[c], orderBy=[proctime DESC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, d, e, rowtime, proctime, window_start, window_end, window_time, COUNT(*) AS w0$o0]) +- Exchange(distribution=[hash[c]]) +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], max_size=[5 min], step=[10 s])]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml index 93f8ca912f5..cc42ab3cc40 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml @@ -949,7 +949,7 @@ Join(joinType=[LeftSemiJoin], where=[((b = rk) AND (a <> d))], select=[a, b, c], : +- TableSourceScan(table=[[default_catalog, default_database, l]], fields=[a, b, c]) +- Exchange(distribution=[hash[rk]]) +- Calc(select=[w0$o0 AS rk, d]) - +- OverAggregate(partitionBy=[d], orderBy=[e ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, f, RANK(*) AS w0$o0]) + +- OverAggregate(partitionBy=[d], orderBy=[e ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, f, RANK(*) AS w0$o0]) +- Exchange(distribution=[hash[d]]) +- TableSourceScan(table=[[default_catalog, default_database, r]], fields=[d, e, f]) ]]> @@ -1488,7 +1488,7 @@ Join(joinType=[LeftSemiJoin], where=[((a = $0) AND (b = $1))], select=[a, b, c], : +- TableSourceScan(table=[[default_catalog, default_database, l]], fields=[a, b, c]) +- Exchange(distribution=[hash[$0, $1]]) +- Calc(select=[w0$o0 AS $0, w0$o1 AS $1]) - +- OverAggregate(partitionBy=[f], orderBy=[d ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, f, MAX(d) AS w0$o0, MIN(e) AS w0$o1]) + +- OverAggregate(partitionBy=[f], orderBy=[d ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, f, MAX(d) AS w0$o0, MIN(e) AS w0$o1]) +- Exchange(distribution=[hash[f]]) +- TableSourceScan(table=[[default_catalog, default_database, r]], fields=[d, e, f]) ]]> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/OverAggregateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/OverAggregateTest.xml index 1cf5b98af65..3047a06dfcd 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/OverAggregateTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/OverAggregateTest.xml @@ -26,7 +26,7 @@ LogicalProject(a=[$0], _c1=[AS(COUNT($2) OVER (ORDER BY $3 NULLS FIRST RANGE 100 <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[a, w0$o0 AS _c1]) -+- OverAggregate(orderBy=[proctime ASC], window=[ RANG BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(c) AS w0$o0]) ++- OverAggregate(orderBy=[proctime ASC], window=[ RANGE BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(c) AS w0$o0]) +- Exchange(distribution=[single]) +- Calc(select=[a, c, proctime]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) @@ -60,7 +60,7 @@ LogicalProject(a=[$0], myAvg=[AS(*org.apache.flink.table.planner.plan.utils.Java <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[a, w0$o0 AS myAvg]) -+- OverAggregate(partitionBy=[a], orderBy=[proctime ASC], window=[ RANG BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[a, c, proctime, $3, *org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$WeightedAvgWithRetract*(c, $3) AS w0$o0]) ++- OverAggregate(partitionBy=[a], orderBy=[proctime ASC], window=[ RANGE BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[a, c, proctime, $3, *org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$WeightedAvgWithRetract*(c, $3) AS w0$o0]) +- Exchange(distribution=[hash[a]]) +- Calc(select=[a, c, proctime, CAST(a AS BIGINT) AS $3]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) @@ -94,7 +94,7 @@ LogicalProject(a=[$0], c=[$2], _c2=[AS(COUNT($0) OVER (ORDER BY $3 NULLS FIRST), <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[a, c, w0$o0 AS _c2, w0$o1 AS _c3]) -+- OverAggregate(orderBy=[proctime ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(a) AS w0$o0, SUM(a) AS w0$o1]) ++- OverAggregate(orderBy=[proctime ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(a) AS w0$o0, SUM(a) AS w0$o1]) +- Exchange(distribution=[single]) +- Calc(select=[a, c, proctime]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) @@ -128,7 +128,7 @@ LogicalProject(a=[$0], c=[$2], _c2=[AS(COUNT($0) OVER (PARTITION BY $2 ORDER BY <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[a, c, w0$o0 AS _c2, w0$o1 AS _c3]) -+- OverAggregate(partitionBy=[c], orderBy=[proctime ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, proctime, $3, COUNT(a) AS w0$o0, *org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$WeightedAvgWithRetract*(c, $3) AS w0$o1]) ++- OverAggregate(partitionBy=[c], orderBy=[proctime ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, proctime, $3, COUNT(a) AS w0$o0, *org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$WeightedAvgWithRetract*(c, $3) AS w0$o1]) +- Exchange(distribution=[hash[c]]) +- Calc(select=[a, c, proctime, CAST(a AS BIGINT) AS $3]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) @@ -162,7 +162,7 @@ LogicalProject(c=[$2], _c1=[AS(COUNT(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, w0$o0 AS _c1, w0$o1 AS _c2, w0$o2 AS _c3]) -+- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANG BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, COUNT(DISTINCT a) AS w0$o0, SUM(DISTINCT a) AS w0$o1, AVG(DISTINCT $3) AS w0$o2]) ++- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANGE BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, COUNT(DISTINCT a) AS w0$o0, SUM(DISTINCT a) AS w0$o1, AVG(DISTINCT $3) AS w0$o2]) +- Exchange(distribution=[hash[c]]) +- Calc(select=[a, c, rowtime, CAST(a AS FLOAT) AS $3]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) @@ -196,7 +196,7 @@ LogicalProject(a=[$0], _c1=[AS(COUNT($2) OVER (ORDER BY $4 NULLS FIRST RANGE 100 <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[a, w0$o0 AS _c1]) -+- OverAggregate(orderBy=[rowtime ASC], window=[ RANG BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(c) AS w0$o0]) ++- OverAggregate(orderBy=[rowtime ASC], window=[ RANGE BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(c) AS w0$o0]) +- Exchange(distribution=[single]) +- Calc(select=[a, c, rowtime]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) @@ -213,7 +213,7 @@ LogicalProject(a=[$0], _c1=[AS(AVG($2) OVER (PARTITION BY $0 ORDER BY $4 NULLS F <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[a, w0$o0 AS _c1, w0$o1 AS wAvg]) -+- OverAggregate(partitionBy=[a], orderBy=[rowtime ASC], window=[ RANG BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, AVG(c) AS w0$o0, *org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$WeightedAvgWithRetract*(c, $3) AS w0$o1]) ++- OverAggregate(partitionBy=[a], orderBy=[rowtime ASC], window=[ RANGE BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, AVG(c) AS w0$o0, *org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$WeightedAvgWithRetract*(c, $3) AS w0$o1]) +- Exchange(distribution=[hash[a]]) +- Calc(select=[a, c, rowtime, CAST(a AS BIGINT) AS $3]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) @@ -264,7 +264,7 @@ LogicalProject(c=[$2], _c1=[AS(COUNT(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[c, w0$o0 AS _c1, w0$o1 AS _c2, w0$o2 AS _c3]) -+- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, COUNT(DISTINCT a) AS w0$o0, SUM(DISTINCT a) AS w0$o1, AVG(DISTINCT $3) AS w0$o2]) ++- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, COUNT(DISTINCT a) AS w0$o0, SUM(DISTINCT a) AS w0$o1, AVG(DISTINCT $3) AS w0$o2]) +- Exchange(distribution=[hash[c]]) +- Calc(select=[a, c, rowtime, CAST(a AS FLOAT) AS $3]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) @@ -298,7 +298,7 @@ LogicalProject(a=[$0], c=[$2], _c2=[AS(COUNT($0) OVER (ORDER BY $4 NULLS FIRST), <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[a, c, w0$o0 AS _c2, w0$o1 AS _c3]) -+- OverAggregate(orderBy=[rowtime ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0, SUM(a) AS w0$o1]) ++- OverAggregate(orderBy=[rowtime ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0, SUM(a) AS w0$o1]) +- Exchange(distribution=[single]) +- Calc(select=[a, c, rowtime]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) @@ -349,7 +349,7 @@ LogicalProject(a=[$0], c=[$2], _c2=[AS(COUNT($0) OVER (PARTITION BY $2 ORDER BY <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[a, c, w0$o0 AS _c2, w0$o1 AS wAvg]) -+- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, COUNT(a) AS w0$o0, *org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$WeightedAvgWithRetract*(c, $3) AS w0$o1]) ++- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, COUNT(a) AS w0$o0, *org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$WeightedAvgWithRetract*(c, $3) AS w0$o1]) +- Exchange(distribution=[hash[c]]) +- Calc(select=[a, c, rowtime, CAST(a AS BIGINT) AS $3]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/PythonOverWindowAggregateTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/PythonOverWindowAggregateTest.xml index 296dd7cbbf8..2126bcbb8f2 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/PythonOverWindowAggregateTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/PythonOverWindowAggregateTest.xml @@ -26,7 +26,7 @@ LogicalProject(b=[$1], _c1=[AS(*org.apache.flink.table.planner.runtime.utils.Jav <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[b, w0$o0 AS _c1]) -+- PythonOverAggregate(partitionBy=[b], orderBy=[$3 ASC], window=[ RANG BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[a, b, c, $3, PandasAggregateFunction(a, c) AS w0$o0]) ++- PythonOverAggregate(partitionBy=[b], orderBy=[$3 ASC], window=[ RANGE BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[a, b, c, $3, PandasAggregateFunction(a, c) AS w0$o0]) +- Exchange(distribution=[hash[b]]) +- Calc(select=[a, b, c, PROCTIME() AS $3]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c]) @@ -61,7 +61,7 @@ LogicalProject(b=[$1], _c1=[AS(*org.apache.flink.table.planner.runtime.utils.Jav <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[b, w0$o0 AS _c1]) -+- PythonOverAggregate(partitionBy=[b], orderBy=[rowtime ASC], window=[ RANG BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[a, b, c, rowtime, PandasAggregateFunction(a, c) AS w0$o0]) ++- PythonOverAggregate(partitionBy=[b], orderBy=[rowtime ASC], window=[ RANGE BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[a, b, c, rowtime, PandasAggregateFunction(a, c) AS w0$o0]) +- Exchange(distribution=[hash[b]]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime]) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml index 7695b8010c3..b85eb7d846d 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml @@ -41,7 +41,7 @@ LogicalFilter(condition=[>($2, 100)]) <Resource name="optimized exec plan"> <![CDATA[ Calc(select=[id, name, w0$o0 AS valSum], where=[(w0$o0 > 100)]) -+- OverAggregate(partitionBy=[id], orderBy=[$3 ASC], window=[ RANG BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[id, val, name, $3, SUM(val) AS w0$o0]) ++- OverAggregate(partitionBy=[id], orderBy=[$3 ASC], window=[ RANGE BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[id, val, name, $3, SUM(val) AS w0$o0]) +- Exchange(distribution=[hash[id]]) +- Calc(select=[id, val, name, PROCTIME() AS $3]) +- TableSourceScan(table=[[default_catalog, default_database, procTimeT]], fields=[id, val, name]) diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-over-aggregate_1/over-aggregate-unbounded-partitioned-rows/plan/over-aggregate-unbounded-partitioned-rows.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-over-aggregate_1/over-aggregate-unbounded-partitioned-rows/plan/over-aggregate-unbounded-partitioned-rows.json index 1cc54126226..a0e738c5c70 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-over-aggregate_1/over-aggregate-unbounded-partitioned-rows/plan/over-aggregate-unbounded-partitioned-rows.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-over-aggregate_1/over-aggregate-unbounded-partitioned-rows/plan/over-aggregate-unbounded-partitioned-rows.json @@ -257,7 +257,7 @@ "priority" : 0 } ], "outputType" : "ROW<`ts` BIGINT, `a` BIGINT, `b` INT, `c` VARCHAR(2147483647), `rowtime` TIMESTAMP(3), `$5` BIGINT NOT NULL, `w0$o0` BIGINT, `w0$o1` BIGINT NOT NULL, `w0$o2` BIGINT NOT NULL>", - "description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window#0=[LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[ts, a, b, c, rowtime, $5, w0$o0, w0$o1, w0$o2])" + "description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window#0=[LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[ts, a, b, c, rowtime, $5, w0$o0, w0$o1, w0$o2])" }, { "id" : 6, "type" : "batch-exec-calc_1", @@ -431,4 +431,4 @@ }, "shuffleMode" : "PIPELINED" } ] -} \ No newline at end of file +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-non-partitioned-rows-with-out-of-order-records/plan/over-aggregate-bounded-non-partitioned-rows-with-out-of-order-records.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-non-partitioned-rows-with-out-of-order-records/plan/over-aggregate-bounded-non-partitioned-rows-with-out-of-order-rec [...] index 4e73f861c70..7e61a62008f 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-non-partitioned-rows-with-out-of-order-records/plan/over-aggregate-bounded-non-partitioned-rows-with-out-of-order-records.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-non-partitioned-rows-with-out-of-order-records/plan/over-aggregate-bounded-non-partitioned-rows-with-out-of-order-records.json @@ -357,7 +357,7 @@ "fieldType" : "BIGINT NOT NULL" } ] }, - "description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANG BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[ts, a, b, c, rowtime, $5, LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])" + "description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANGE BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[ts, a, b, c, rowtime, $5, LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])" }, { "id" : 15, "type" : "stream-exec-calc_1", @@ -588,4 +588,4 @@ }, "shuffleMode" : "PIPELINED" } ] -} \ No newline at end of file +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-non-partitioned-rows/plan/over-aggregate-bounded-non-partitioned-rows.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-non-partitioned-rows/plan/over-aggregate-bounded-non-partitioned-rows.json index 2afb5fe7bb9..ac8365bb10d 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-non-partitioned-rows/plan/over-aggregate-bounded-non-partitioned-rows.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-non-partitioned-rows/plan/over-aggregate-bounded-non-partitioned-rows.json @@ -356,7 +356,7 @@ "fieldType" : "BIGINT NOT NULL" } ] }, - "description" : "OverAggregate(orderBy=[rowtime ASC], window=[ RANG BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[ts, a, b, c, rowtime, $5, LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])" + "description" : "OverAggregate(orderBy=[rowtime ASC], window=[ RANGE BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[ts, a, b, c, rowtime, $5, LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])" }, { "id" : 15, "type" : "stream-exec-calc_1", @@ -587,4 +587,4 @@ }, "shuffleMode" : "PIPELINED" } ] -} \ No newline at end of file +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-partitioned-rows-with-out-of-order-records/plan/over-aggregate-bounded-partitioned-rows-with-out-of-order-records.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-partitioned-rows-with-out-of-order-records/plan/over-aggregate-bounded-partitioned-rows-with-out-of-order-records.json index 8aad7619213..8905d1553ae 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-partitioned-rows-with-out-of-order-records/plan/over-aggregate-bounded-partitioned-rows-with-out-of-order-records.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-partitioned-rows-with-out-of-order-records/plan/over-aggregate-bounded-partitioned-rows-with-out-of-order-records.json @@ -357,7 +357,7 @@ "fieldType" : "BIGINT NOT NULL" } ] }, - "description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANG BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[ts, a, b, c, rowtime, $5, LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])", + "description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANGE BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[ts, a, b, c, rowtime, $5, LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])", "unboundedOverVersion" : 1 }, { "id" : 7, diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-partitioned-rows/plan/over-aggregate-bounded-partitioned-rows.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-partitioned-rows/plan/over-aggregate-bounded-partitioned-rows.json index fa7b5b76970..55283e3a9ce 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-partitioned-rows/plan/over-aggregate-bounded-partitioned-rows.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-partitioned-rows/plan/over-aggregate-bounded-partitioned-rows.json @@ -357,7 +357,7 @@ "fieldType" : "BIGINT NOT NULL" } ] }, - "description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANG BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[ts, a, b, c, rowtime, $5, LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])", + "description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANGE BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[ts, a, b, c, rowtime, $5, LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])", "unboundedOverVersion" : 1 }, { "id" : 7, diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/plan/over-aggregate-lag.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/plan/over-aggregate-lag.json index c09e4e1171c..0281b92a15b 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/plan/over-aggregate-lag.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/plan/over-aggregate-lag.json @@ -210,7 +210,7 @@ "fieldType" : "MAP<DOUBLE, DOUBLE>" } ] }, - "description" : "OverAggregate(orderBy=[r_time ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[ts, b, r_time, LAG(b, 1) AS w0$o0])" + "description" : "OverAggregate(orderBy=[r_time ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[ts, b, r_time, LAG(b, 1) AS w0$o0])" }, { "id" : 6, "type" : "stream-exec-calc_1", @@ -315,4 +315,4 @@ }, "shuffleMode" : "PIPELINED" } ] -} \ No newline at end of file +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-avg-append-mode/plan/over-aggregate-non-time-range-unbounded-avg-append-mode.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-avg-append-mode/plan/over-aggregate-non-time-range-unbounded-avg-append-mode.json index 2ce44ad537f..e2fe499f6a6 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-avg-append-mode/plan/over-aggregate-non-time-range-unbounded-avg-append-mode.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-avg-append-mode/plan/over-aggregate-non-time-range-unbounded-avg-append-mode.json @@ -94,7 +94,7 @@ "priority" : 0 } ], "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT, `w0$o0` BIGINT NOT NULL, `w0$o1` BIGINT NOT NULL>", - "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])", + "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])", "unboundedOverVersion" : 2 }, { "id" : 21, @@ -234,4 +234,4 @@ }, "shuffleMode" : "PIPELINED" } ] -} \ No newline at end of file +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-multiple-aggs-append-mode/plan/over-aggregate-non-time-range-unbounded-multiple-aggs-append-mode.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-multiple-aggs-append-mode/plan/over-aggregate-non-time-range-unbounded-multiple-aggs-append-mode.json index 1d3547ff339..ec700f0bf28 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-multiple-aggs-append-mode/plan/over-aggregate-non-time-range-unbounded-multiple-aggs-append-mode.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-multiple-aggs-append-mode/plan/over-aggregate-non-time-range-unbounded-multiple-aggs-append-mode.json @@ -104,7 +104,7 @@ "priority" : 0 } ], "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT, `w0$o0` BIGINT NOT NULL, `w0$o1` BIGINT NOT NULL, `w0$o2` BIGINT NOT NULL>", - "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1, COUNT(key) AS w0$o2])", + "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1, COUNT(key) AS w0$o2])", "unboundedOverVersion" : 2 }, { "id" : 26, @@ -241,4 +241,4 @@ }, "shuffleMode" : "PIPELINED" } ] -} \ No newline at end of file +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-append-mode/plan/over-aggregate-non-time-range-unbounded-sum-append-mode.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-append-mode/plan/over-aggregate-non-time-range-unbounded-sum-append-mode.json index 9d0d09ed79c..1c73cdfc3ce 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-append-mode/plan/over-aggregate-non-time-range-unbounded-sum-append-mode.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-append-mode/plan/over-aggregate-non-time-range-unbounded-sum-append-mode.json @@ -94,7 +94,7 @@ "priority" : 0 } ], "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT, `w0$o0` BIGINT NOT NULL, `w0$o1` BIGINT NOT NULL>", - "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])", + "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])", "unboundedOverVersion" : 2 }, { "id" : 16, @@ -224,4 +224,4 @@ }, "shuffleMode" : "PIPELINED" } ] -} \ No newline at end of file +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-no-partition-by/plan/over-aggregate-non-time-range-unbounded-sum-no-partition-by.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-no-partition-by/plan/over-aggregate-non-time-range-unbounded-sum-no-partition-by.json index 18e9ff775a3..e9a5e181521 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-no-partition-by/plan/over-aggregate-non-time-range-unbounded-sum-no-partition-by.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-no-partition-by/plan/over-aggregate-non-time-range-unbounded-sum-no-partition-by.json @@ -93,7 +93,7 @@ "priority" : 0 } ], "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT, `w0$o0` BIGINT NOT NULL, `w0$o1` BIGINT NOT NULL>", - "description" : "OverAggregate(orderBy=[val ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])", + "description" : "OverAggregate(orderBy=[val ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])", "unboundedOverVersion" : 2 }, { "id" : 31, @@ -223,4 +223,4 @@ }, "shuffleMode" : "PIPELINED" } ] -} \ No newline at end of file +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-sink-primary-key/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-sink-primary-key.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-sink-primary-key/plan/over-aggregate-non-time-range-unbounded-sum-retract-m [...] index 09218eb7fc0..11c0c495d16 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-sink-primary-key/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-sink-primary-key.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-sink-primary-key/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-sink-primary-key.json @@ -94,7 +94,7 @@ "priority" : 0 } ], "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT, `w0$o0` BIGINT NOT NULL, `w0$o1` BIGINT NOT NULL>", - "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])", + "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])", "unboundedOverVersion" : 2 }, { "id" : 41, @@ -235,4 +235,4 @@ }, "shuffleMode" : "PIPELINED" } ] -} \ No newline at end of file +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-sort-by-key/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-sort-by-key.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-sort-by-key/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-sort-by-key.json index 7e444e9bb37..056f57aaa63 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-sort-by-key/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-sort-by-key.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-sort-by-key/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-sort-by-key.json @@ -91,7 +91,7 @@ "priority" : 0 } ], "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT, `w0$o0` BIGINT NOT NULL, `w0$o1` BIGINT NOT NULL>", - "description" : "OverAggregate(partitionBy=[val], orderBy=[key ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])", + "description" : "OverAggregate(partitionBy=[val], orderBy=[key ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])", "unboundedOverVersion" : 2 }, { "id" : 4, @@ -219,4 +219,4 @@ }, "shuffleMode" : "PIPELINED" } ] -} \ No newline at end of file +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-primary-key/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-primary-key.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-primary-key/plan/over-aggregate-non-time-range-unbounded-sum-ret [...] index 4a8e2b5292d..1173ff5983b 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-primary-key/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-primary-key.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-primary-key/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-primary-key.json @@ -99,7 +99,7 @@ "priority" : 0 } ], "outputType" : "ROW<`key` VARCHAR(2147483647) NOT NULL, `val` BIGINT, `ts` BIGINT, `w0$o0` BIGINT NOT NULL, `w0$o1` BIGINT NOT NULL>", - "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])", + "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])", "unboundedOverVersion" : 2 }, { "id" : 36, @@ -230,4 +230,4 @@ }, "shuffleMode" : "PIPELINED" } ] -} \ No newline at end of file +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-sink-primary-key-partition-by-non-pk/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-sink-primary-key-partition-by-non-pk.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-sink-primary-k [...] index 2bf686cb9cf..729736c24cf 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-sink-primary-key-partition-by-non-pk/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-sink-primary-key-partition-by-non-pk.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-sink-primary-key-partition-by-non-pk/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-sink-primary-key-partition-by-non-pk.json @@ -99,7 +99,7 @@ "priority" : 0 } ], "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT NOT NULL, `ts` BIGINT, `w0$o0` BIGINT NOT NULL, `w0$o1` BIGINT NOT NULL>", - "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, ts, COUNT(ts) AS w0$o0, $SUM0(ts) AS w0$o1])", + "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, ts, COUNT(ts) AS w0$o0, $SUM0(ts) AS w0$o1])", "unboundedOverVersion" : 2 }, { "id" : 54, @@ -240,4 +240,4 @@ }, "shuffleMode" : "PIPELINED" } ] -} \ No newline at end of file +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-sink-primary-key/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-sink-primary-key.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-sink-primary-key/plan/over-aggregate-non-time-range-un [...] index 4830f8e51fb..03b7c08e214 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-sink-primary-key/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-sink-primary-key.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-sink-primary-key/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-sink-primary-key.json @@ -147,7 +147,7 @@ "priority" : 0 } ], "outputType" : "ROW<`key` VARCHAR(2147483647) NOT NULL, `val` BIGINT, `ts` BIGINT, `w0$o0` BIGINT NOT NULL, `w0$o1` BIGINT NOT NULL>", - "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])", + "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])", "unboundedOverVersion" : 2 }, { "id" : 49, @@ -304,4 +304,4 @@ }, "shuffleMode" : "PIPELINED" } ] -} \ No newline at end of file +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode.json index b7de4fb3f6d..b943cd06f9d 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode.json @@ -94,7 +94,7 @@ "priority" : 0 } ], "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT, `w0$o0` BIGINT NOT NULL, `w0$o1` BIGINT NOT NULL>", - "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])", + "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])", "unboundedOverVersion" : 2 }, { "id" : 11, @@ -224,4 +224,4 @@ }, "shuffleMode" : "PIPELINED" } ] -} \ No newline at end of file +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-unbounded-partitioned-rows-with-out-of-order-records/plan/over-aggregate-unbounded-partitioned-rows-with-out-of-order-records.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-unbounded-partitioned-rows-with-out-of-order-records/plan/over-aggregate-unbounded-partitioned-rows-with-out-of-order-records.json index 4ea0de87baf..c59f0058d77 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-unbounded-partitioned-rows-with-out-of-order-records/plan/over-aggregate-unbounded-partitioned-rows-with-out-of-order-records.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-unbounded-partitioned-rows-with-out-of-order-records/plan/over-aggregate-unbounded-partitioned-rows-with-out-of-order-records.json @@ -347,7 +347,7 @@ "fieldType" : "BIGINT NOT NULL" } ] }, - "description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[ts, a, b, c, rowtime, $5, LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])" + "description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[ts, a, b, c, rowtime, $5, LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])" }, { "id" : 23, "type" : "stream-exec-calc_1", @@ -578,4 +578,4 @@ }, "shuffleMode" : "PIPELINED" } ] -} \ No newline at end of file +} diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-unbounded-partitioned-rows/plan/over-aggregate-unbounded-partitioned-rows.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-unbounded-partitioned-rows/plan/over-aggregate-unbounded-partitioned-rows.json index 6efa3a92257..fd860cd4594 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-unbounded-partitioned-rows/plan/over-aggregate-unbounded-partitioned-rows.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-unbounded-partitioned-rows/plan/over-aggregate-unbounded-partitioned-rows.json @@ -347,7 +347,7 @@ "fieldType" : "BIGINT NOT NULL" } ] }, - "description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[ts, a, b, c, rowtime, $5, LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])", + "description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[ts, a, b, c, rowtime, $5, LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])", "unboundedOverVersion" : 1 }, { "id" : 7,