This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f4328b3729aa32c58742dc136e9c994101f8c6ee
Author: Sergey Nuyanzin <snuyan...@gmail.com>
AuthorDate: Tue Jul 29 17:11:27 2025 +0200

    [FLINK-38150][table] Use `RANGE` in plans instead of `RANG`
    
    This closes #26840.
---
 .../table/planner/plan/utils/RelExplainUtil.scala  |  2 +-
 .../planner/plan/batch/sql/DeadlockBreakupTest.xml |  6 +-
 .../plan/batch/sql/ForwardHashExchangeTest.xml     |  8 +--
 .../planner/plan/batch/sql/RemoveCollationTest.xml |  2 +-
 .../planner/plan/batch/sql/RemoveShuffleTest.xml   | 10 +--
 .../planner/plan/batch/sql/SubplanReuseTest.xml    |  6 +-
 .../batch/sql/agg/AggregateReduceGroupingTest.xml  |  2 +-
 .../plan/batch/sql/agg/OverAggregateTest.xml       | 82 +++++++++++-----------
 .../sql/join/BroadcastHashSemiAntiJoinTest.xml     |  6 +-
 .../batch/sql/join/NestedLoopSemiAntiJoinTest.xml  |  6 +-
 .../plan/batch/sql/join/SemiAntiJoinTest.xml       |  6 +-
 .../sql/join/ShuffledHashSemiAntiJoinTest.xml      |  6 +-
 .../batch/sql/join/SortMergeSemiAntiJoinTest.xml   |  6 +-
 .../batch/table/PythonOverWindowAggregateTest.xml  |  2 +-
 .../testProcTimeBoundedNonPartitionedRangeOver.out |  2 +-
 .../testProcTimeBoundedPartitionedRangeOver.out    |  2 +-
 .../testProcTimeUnboundedPartitionedRangeOver.out  |  2 +-
 .../PushLocalAggIntoTableSourceScanRuleTest.xml    |  2 +-
 .../planner/plan/stream/sql/MatchRecognizeTest.xml |  2 +-
 .../planner/plan/stream/sql/SubplanReuseTest.xml   |  4 +-
 .../plan/stream/sql/agg/OverAggregateTest.xml      | 22 +++---
 .../plan/stream/sql/agg/WindowAggregateTest.xml    |  4 +-
 .../plan/stream/sql/join/SemiAntiJoinTest.xml      |  4 +-
 .../plan/stream/table/OverAggregateTest.xml        | 20 +++---
 .../stream/table/PythonOverWindowAggregateTest.xml |  4 +-
 .../planner/plan/stream/table/TableSourceTest.xml  |  2 +-
 .../over-aggregate-unbounded-partitioned-rows.json |  4 +-
 ...partitioned-rows-with-out-of-order-records.json |  4 +-
 ...ver-aggregate-bounded-non-partitioned-rows.json |  4 +-
 ...partitioned-rows-with-out-of-order-records.json |  2 +-
 .../over-aggregate-bounded-partitioned-rows.json   |  2 +-
 .../plan/over-aggregate-lag.json                   |  4 +-
 ...e-non-time-range-unbounded-avg-append-mode.json |  4 +-
 ...-range-unbounded-multiple-aggs-append-mode.json |  4 +-
 ...e-non-time-range-unbounded-sum-append-mode.json |  4 +-
 ...n-time-range-unbounded-sum-no-partition-by.json |  4 +-
 ...nbounded-sum-retract-mode-sink-primary-key.json |  4 +-
 ...nge-unbounded-sum-retract-mode-sort-by-key.json |  4 +-
 ...ounded-sum-retract-mode-source-primary-key.json |  4 +-
 ...ource-sink-primary-key-partition-by-non-pk.json |  4 +-
 ...d-sum-retract-mode-source-sink-primary-key.json |  4 +-
 ...-non-time-range-unbounded-sum-retract-mode.json |  4 +-
 ...partitioned-rows-with-out-of-order-records.json |  4 +-
 .../over-aggregate-unbounded-partitioned-rows.json |  2 +-
 44 files changed, 143 insertions(+), 143 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala
index 621c3070096..970ed8fae50 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelExplainUtil.scala
@@ -621,7 +621,7 @@ object RelExplainUtil {
     }
 
     val buf = new StringBuilder
-    buf.append(if (groupWindow.isRows) " ROWS " else " RANG ")
+    buf.append(if (groupWindow.isRows) " ROWS " else " RANGE ")
     val lowerBound = groupWindow.lowerBound
     val upperBound = groupWindow.upperBound
     if (lowerBound != null) {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DeadlockBreakupTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DeadlockBreakupTest.xml
index 5d23710c48b..d347bb578cb 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DeadlockBreakupTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DeadlockBreakupTest.xml
@@ -539,18 +539,18 @@ LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$3], b0=[$4], 
c0=[$5])
 HashJoin(joinType=[InnerJoin], where=[(c = c0)], select=[a, b, c, a0, b0, c0], 
build=[left])
 :- Exchange(distribution=[hash[c]])
 :  +- Calc(select=[w0$o0 AS a, b, c])
-:     +- OverAggregate(partitionBy=[b], orderBy=[b ASC], window#0=[MAX(a) AS 
w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[a, b, 
c, w0$o0])
+:     +- OverAggregate(partitionBy=[b], orderBy=[b ASC], window#0=[MAX(a) AS 
w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[a, b, 
c, w0$o0])
 :        +- Exchange(distribution=[forward])
 :           +- Calc(select=[CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS a, b, 
c])(reuse_id=[1])
 :              +- Exchange(distribution=[forward])
-:                 +- OverAggregate(partitionBy=[b], orderBy=[b ASC], 
window#0=[COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW], select=[a, b, c, w0$o0, w0$o1])
+:                 +- OverAggregate(partitionBy=[b], orderBy=[b ASC], 
window#0=[COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1 RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0, w0$o1])
 :                    +- Exchange(distribution=[forward])
 :                       +- Sort(orderBy=[b ASC])
 :                          +- Exchange(distribution=[hash[b]])
 :                             +- TableSourceScan(table=[[default_catalog, 
default_database, x]], fields=[a, b, c])
 +- Exchange(distribution=[hash[c]], shuffle_mode=[BATCH])
    +- Calc(select=[w0$o0 AS a, b, c])
-      +- OverAggregate(partitionBy=[b], orderBy=[b ASC], window#0=[MIN(a) AS 
w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[a, b, 
c, w0$o0])
+      +- OverAggregate(partitionBy=[b], orderBy=[b ASC], window#0=[MIN(a) AS 
w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[a, b, 
c, w0$o0])
          +- Exchange(distribution=[forward])
             +- Reused(reference_id=[1])
 ]]>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.xml
index d3094ef8e6a..c623d637ad9 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/ForwardHashExchangeTest.xml
@@ -298,7 +298,7 @@ LogicalProject(b=[$0], EXPR$1=[RANK() OVER (ORDER BY $0 
NULLS FIRST)])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-OverAggregate(orderBy=[b ASC], window#0=[RANK(*) AS w0$o0 RANG BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW], select=[b, w0$o0])
+OverAggregate(orderBy=[b ASC], window#0=[RANK(*) AS w0$o0 RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW], select=[b, w0$o0])
 +- Exchange(distribution=[forward])
    +- Sort(orderBy=[b ASC])
       +- Exchange(distribution=[forward])
@@ -330,7 +330,7 @@ LogicalProject(sum_b=[$1], avg_b=[/(CASE(>(COUNT($1) OVER 
(PARTITION BY $0), 0),
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[sum_b, (CASE((w0$o0 > 0), w0$o1, null:BIGINT) / w0$o0) AS avg_b, 
w1$o0 AS rn, c])
-+- OverAggregate(partitionBy=[c], window#0=[COUNT(sum_b) AS w0$o0, 
$SUM0(sum_b) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING], window#1=[RANK(*) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW], select=[c, sum_b, w0$o0, w0$o1, w1$o0])
++- OverAggregate(partitionBy=[c], window#0=[COUNT(sum_b) AS w0$o0, 
$SUM0(sum_b) AS w0$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING], window#1=[RANK(*) AS w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW], select=[c, sum_b, w0$o0, w0$o1, w1$o0])
    +- Exchange(distribution=[forward])
       +- Sort(orderBy=[c ASC])
          +- Exchange(distribution=[keep_input_as_is[hash[c]]])
@@ -355,7 +355,7 @@ LogicalProject(b=[$0], EXPR$1=[RANK() OVER (ORDER BY $0 
NULLS FIRST)])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-OverAggregate(orderBy=[b ASC], window#0=[RANK(*) AS w0$o0 RANG BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW], select=[b, w0$o0])
+OverAggregate(orderBy=[b ASC], window#0=[RANK(*) AS w0$o0 RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW], select=[b, w0$o0])
 +- Exchange(distribution=[forward])
    +- Sort(orderBy=[b ASC])
       +- Exchange(distribution=[forward])
@@ -387,7 +387,7 @@ LogicalProject(sum_b=[$1], avg_b=[/(CASE(>(COUNT($1) OVER 
(PARTITION BY $0), 0),
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[sum_b, (CASE((w0$o0 > 0), w0$o1, null:BIGINT) / w0$o0) AS avg_b, 
w1$o0 AS rn, c])
-+- OverAggregate(partitionBy=[c], window#0=[COUNT(sum_b) AS w0$o0, 
$SUM0(sum_b) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING], window#1=[RANK(*) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW], select=[c, sum_b, w0$o0, w0$o1, w1$o0])
++- OverAggregate(partitionBy=[c], window#0=[COUNT(sum_b) AS w0$o0, 
$SUM0(sum_b) AS w0$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING], window#1=[RANK(*) AS w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW], select=[c, sum_b, w0$o0, w0$o1, w1$o0])
    +- Exchange(distribution=[forward])
       +- SortAggregate(isMerge=[true], groupBy=[c], select=[c, 
Final_SUM(sum$0) AS sum_b])
          +- Exchange(distribution=[forward])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml
index 958420904ce..baac5667b37 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveCollationTest.xml
@@ -503,7 +503,7 @@ LogicalProject(sum_b=[$1], avg_b=[/(CASE(>(COUNT($1) OVER 
(PARTITION BY $0 ORDER
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[sum_b, (CASE((w0$o0 > 0), w0$o1, null:BIGINT) / w0$o0) AS avg_b, 
w0$o2 AS rn])
-+- OverAggregate(partitionBy=[a], orderBy=[a ASC], window#0=[COUNT(sum_b) AS 
w0$o0, $SUM0(sum_b) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
window#1=[RANK(*) AS w0$o2 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
select=[a, sum_b, w0$o0, w0$o1, w0$o2])
++- OverAggregate(partitionBy=[a], orderBy=[a ASC], window#0=[COUNT(sum_b) AS 
w0$o0, $SUM0(sum_b) AS w0$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW], window#1=[RANK(*) AS w0$o2 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW], select=[a, sum_b, w0$o0, w0$o1, w0$o2])
    +- Exchange(distribution=[forward])
       +- SortAggregate(isMerge=[true], groupBy=[a], select=[a, 
Final_SUM(sum$0) AS sum_b])
          +- Exchange(distribution=[forward])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveShuffleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveShuffleTest.xml
index 97a174f1905..2fc7095c9c4 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveShuffleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RemoveShuffleTest.xml
@@ -561,11 +561,11 @@ LogicalProject(sum_b=[$2], avg_b=[/(CASE(>(COUNT($2) OVER 
(PARTITION BY $0, $1),
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[sum_b, (CASE((w0$o0 > 0), w0$o1, null:BIGINT) / w0$o0) AS avg_b, 
w1$o0 AS rn, c])
-+- OverAggregate(partitionBy=[c], orderBy=[a ASC, c ASC], window#0=[RANK(*) AS 
w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, sum_b, 
w0$o0, w0$o1, w1$o0])
++- OverAggregate(partitionBy=[c], orderBy=[a ASC, c ASC], window#0=[RANK(*) AS 
w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, sum_b, 
w0$o0, w0$o1, w1$o0])
    +- Exchange(distribution=[forward])
       +- Sort(orderBy=[c ASC, a ASC])
          +- Exchange(distribution=[hash[c]])
-            +- OverAggregate(partitionBy=[a, c], window#0=[COUNT(sum_b) AS 
w0$o0, $SUM0(sum_b) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING], select=[a, c, sum_b, w0$o0, w0$o1])
+            +- OverAggregate(partitionBy=[a, c], window#0=[COUNT(sum_b) AS 
w0$o0, $SUM0(sum_b) AS w0$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING], select=[a, c, sum_b, w0$o0, w0$o1])
                +- Exchange(distribution=[forward])
                   +- Sort(orderBy=[a ASC, c ASC])
                      +- Exchange(distribution=[keep_input_as_is[hash[a, c]]])
@@ -639,7 +639,7 @@ LogicalProject(sum_b=[$1], avg_b=[/(CASE(>(COUNT($1) OVER 
(PARTITION BY $0), 0),
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[sum_b, (CASE((w0$o0 > 0), w0$o1, null:BIGINT) / w0$o0) AS avg_b, 
w1$o0 AS rn, c])
-+- OverAggregate(partitionBy=[c], window#0=[COUNT(sum_b) AS w0$o0, 
$SUM0(sum_b) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING], window#1=[RANK(*) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW], select=[c, sum_b, w0$o0, w0$o1, w1$o0])
++- OverAggregate(partitionBy=[c], window#0=[COUNT(sum_b) AS w0$o0, 
$SUM0(sum_b) AS w0$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING], window#1=[RANK(*) AS w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW], select=[c, sum_b, w0$o0, w0$o1, w1$o0])
    +- Exchange(distribution=[forward])
       +- Sort(orderBy=[c ASC])
          +- Exchange(distribution=[keep_input_as_is[hash[c]]])
@@ -674,7 +674,7 @@ LogicalProject(sum_b=[$2], avg_b=[/(CASE(>(COUNT($2) OVER 
(PARTITION BY $1), 0),
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[sum_b, (CASE((w0$o0 > 0), w0$o1, null:BIGINT) / w0$o0) AS avg_b, 
w1$o0 AS rn, c])
-+- OverAggregate(partitionBy=[c], window#0=[COUNT(sum_b) AS w0$o0, 
$SUM0(sum_b) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING], window#1=[RANK(*) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW], select=[c, sum_b, w0$o0, w0$o1, w1$o0])
++- OverAggregate(partitionBy=[c], window#0=[COUNT(sum_b) AS w0$o0, 
$SUM0(sum_b) AS w0$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING], window#1=[RANK(*) AS w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW], select=[c, sum_b, w0$o0, w0$o1, w1$o0])
    +- Exchange(distribution=[forward])
       +- Sort(orderBy=[c ASC])
          +- Exchange(distribution=[hash[c]])
@@ -1340,7 +1340,7 @@ LogicalProject(sum_b=[$0], avg_b=[/(CASE(>(COUNT($0) OVER 
(), 0), $SUM0($0) OVER
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[sum_b, (CASE((w0$o0 > 0), w0$o1, null:BIGINT) / w0$o0) AS avg_b])
-+- OverAggregate(window#0=[COUNT(sum_b) AS w0$o0, $SUM0(sum_b) AS w0$o1 RANG 
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[sum_b, w0$o0, 
w0$o1])
++- OverAggregate(window#0=[COUNT(sum_b) AS w0$o0, $SUM0(sum_b) AS w0$o1 RANGE 
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[sum_b, w0$o0, 
w0$o1])
    +- HashAggregate(isMerge=[true], select=[Final_SUM(sum$0) AS sum_b])
       +- Exchange(distribution=[single])
          +- LocalHashAggregate(select=[Partial_SUM(b) AS sum$0])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.xml
index 281a28978e7..4fb095acffc 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/SubplanReuseTest.xml
@@ -344,7 +344,7 @@ Calc(select=[c, e, avg_b, sum_b, sum_b0 AS psum, sum_b1 AS 
nsum, avg_b0 AS avg_b
 +- MultipleInput(readOrder=[2,0,1], members=[\nHashJoin(joinType=[InnerJoin], 
where=[((c = c0) AND (e = e0) AND (rn = $f5))], select=[sum_b, avg_b, rn, c, e, 
sum_b0, avg_b0, sum_b1, c0, e0, $f5], build=[left])\n:- Calc(select=[sum_b, 
avg_b, rn, c, e, sum_b0, avg_b0])\n:  +- HashJoin(joinType=[InnerJoin], 
where=[((c = c0) AND (e = e0) AND (rn = $f5))], select=[sum_b, avg_b, rn, c, e, 
sum_b0, avg_b0, c0, e0, $f5], build=[left])\n:     :- [#2] 
Exchange(distribution=[hash[c, e, rn]])\n:      [...]
    :- Exchange(distribution=[hash[c, e, $f5]])
    :  +- Calc(select=[sum_b, c, e, (w1$o0 - 1) AS $f5], where=[(c <> '')])
-   :     +- OverAggregate(partitionBy=[c, e], window#0=[COUNT(sum_b) AS w0$o0, 
$SUM0(sum_b) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING], window#1=[RANK(*) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW], select=[c, e, sum_b, w0$o0, w0$o1, w1$o0])(reuse_id=[1])
+   :     +- OverAggregate(partitionBy=[c, e], window#0=[COUNT(sum_b) AS w0$o0, 
$SUM0(sum_b) AS w0$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING], window#1=[RANK(*) AS w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW], select=[c, e, sum_b, w0$o0, w0$o1, w1$o0])(reuse_id=[1])
    :        +- Exchange(distribution=[forward])
    :           +- Sort(orderBy=[c ASC, e ASC])
    :              +- Exchange(distribution=[keep_input_as_is[hash[c, e]]])
@@ -1067,7 +1067,7 @@ LogicalProject(a=[$0], b=[$1], EXPR$2=[$2], a0=[$3], 
b0=[$4], EXPR$20=[$5])
 HashJoin(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, w0$o0, a0, b0, 
w0$o00], build=[left])
 :- Exchange(distribution=[hash[a]])
 :  +- Calc(select=[a, b, w0$o0], where=[(b < 100)])
-:     +- OverAggregate(partitionBy=[c], orderBy=[c DESC], window#0=[MyFirst(c) 
AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, 
w0$o0])(reuse_id=[1])
+:     +- OverAggregate(partitionBy=[c], orderBy=[c DESC], window#0=[MyFirst(c) 
AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, 
w0$o0])(reuse_id=[1])
 :        +- Exchange(distribution=[forward])
 :           +- Sort(orderBy=[c DESC])
 :              +- Exchange(distribution=[hash[c]])
@@ -1176,7 +1176,7 @@ LogicalProject(a=[$0], b=[$1], EXPR$2=[$2], a0=[$3], 
b0=[$4], EXPR$20=[$5])
 HashJoin(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, w0$o0, a0, b0, 
w0$o00], build=[left])
 :- Exchange(distribution=[hash[a]])
 :  +- Calc(select=[a, b, w0$o0], where=[(b < 100)])
-:     +- OverAggregate(orderBy=[c DESC], window#0=[RANK(*) AS w0$o0 RANG 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, 
w0$o0])(reuse_id=[1])
+:     +- OverAggregate(orderBy=[c DESC], window#0=[RANK(*) AS w0$o0 RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, 
w0$o0])(reuse_id=[1])
 :        +- Sort(orderBy=[c DESC])
 :           +- Exchange(distribution=[single])
 :              +- TableSourceScan(table=[[default_catalog, default_database, 
x]], fields=[a, b, c])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml
index 855646fe424..8f583b68198 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml
@@ -273,7 +273,7 @@ Calc(select=[a1, b1, c, EXPR$3])
    +- Exchange(distribution=[hash[a1, c]])
       +- LocalHashAggregate(groupBy=[a1, c], auxGrouping=[b1], select=[a1, c, 
b1, Partial_COUNT(d1) AS count$0])
          +- Calc(select=[a1, b1, w0$o0 AS c, d1])
-            +- OverAggregate(partitionBy=[c1], window#0=[COUNT(*) AS w0$o0 
RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[a1, b1, c1, 
d1, w0$o0])
+            +- OverAggregate(partitionBy=[c1], window#0=[COUNT(*) AS w0$o0 
RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[a1, b1, c1, 
d1, w0$o0])
                +- Exchange(distribution=[forward])
                   +- Sort(orderBy=[c1 ASC])
                      +- Exchange(distribution=[hash[c1]])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml
index d75657f64e5..b9329e59478 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/OverAggregateTest.xml
@@ -37,15 +37,15 @@ LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY 
$1 ORDER BY $0 NULLS
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS EXPR$0, w1$o0 AS 
EXPR$1, CAST((CASE((w2$o0 > 0), w2$o1, null:INTEGER) / w2$o0) AS INTEGER) AS 
EXPR$2, w0$o2 AS EXPR$3, w2$o2 AS EXPR$4])
-+- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(a) AS 
w2$o0, $SUM0(a) AS w2$o1, MIN(a) AS w2$o2 RANG BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW], select=[a, b, c, w0$o0, w0$o1, w0$o2, w1$o0, w2$o0, w2$o1, w2$o2])
++- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(a) AS 
w2$o0, $SUM0(a) AS w2$o1, MIN(a) AS w2$o2 RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW], select=[a, b, c, w0$o0, w0$o1, w0$o2, w1$o0, w2$o0, w2$o1, w2$o2])
    +- Exchange(distribution=[forward])
       +- Sort(orderBy=[c ASC, a ASC])
          +- Exchange(distribution=[hash[c]])
-            +- OverAggregate(partitionBy=[b], orderBy=[c ASC], 
window#0=[MAX(a) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
select=[a, b, c, w0$o0, w0$o1, w0$o2, w1$o0])
+            +- OverAggregate(partitionBy=[b], orderBy=[c ASC], 
window#0=[MAX(a) AS w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
select=[a, b, c, w0$o0, w0$o1, w0$o2, w1$o0])
                +- Exchange(distribution=[forward])
                   +- Sort(orderBy=[b ASC, c ASC])
                      +- Exchange(distribution=[keep_input_as_is[hash[b]]])
-                        +- OverAggregate(partitionBy=[b], orderBy=[a ASC], 
window#0=[COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW], window#1=[RANK(*) AS w0$o2 RANG BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW], select=[a, b, c, w0$o0, w0$o1, w0$o2])
+                        +- OverAggregate(partitionBy=[b], orderBy=[a ASC], 
window#0=[COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1 RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW], window#1=[RANK(*) AS w0$o2 RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0, w0$o1, w0$o2])
                            +- Exchange(distribution=[forward])
                               +- Sort(orderBy=[b ASC, a ASC])
                                  +- Exchange(distribution=[hash[b]])
@@ -74,19 +74,19 @@ LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY 
$1 ORDER BY $2 NULLS
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS EXPR$0, w1$o0 AS 
EXPR$1, w2$o0 AS EXPR$2, w0$o2 AS EXPR$3, CAST((CASE((w3$o0 > 0), w3$o1, 
null:INTEGER) / w3$o0) AS INTEGER) AS EXPR$4])
-+- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[MAX(a) AS w0$o0 
RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o2, 
w1$o0, w0$o1, w3$o0, w3$o1, w2$o0, w0$o0])
++- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[MAX(a) AS w0$o0 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o2, 
w1$o0, w0$o1, w3$o0, w3$o1, w2$o0, w0$o0])
    +- Exchange(distribution=[forward])
       +- Sort(orderBy=[c ASC, a ASC])
          +- Exchange(distribution=[hash[c]])
-            +- OverAggregate(partitionBy=[b], orderBy=[c ASC], 
window#0=[COUNT(a) AS w3$o0, $SUM0(a) AS w3$o1 RANG BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW], window#1=[RANK(*) AS w2$o0 RANG BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0, w0$o1, w3$o0, w3$o1, w2$o0])
+            +- OverAggregate(partitionBy=[b], orderBy=[c ASC], 
window#0=[COUNT(a) AS w3$o0, $SUM0(a) AS w3$o1 RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW], window#1=[RANK(*) AS w2$o0 RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0, w0$o1, w3$o0, w3$o1, 
w2$o0])
                +- Exchange(distribution=[forward])
                   +- Sort(orderBy=[b ASC, c ASC])
                      +- Exchange(distribution=[hash[b]])
-                        +- OverAggregate(orderBy=[c ASC, a ASC], 
window#0=[MIN(a) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
select=[a, b, c, w0$o2, w1$o0, w0$o1])
+                        +- OverAggregate(orderBy=[c ASC, a ASC], 
window#0=[MIN(a) AS w0$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
select=[a, b, c, w0$o2, w1$o0, w0$o1])
                            +- Exchange(distribution=[forward])
                               +- Sort(orderBy=[c ASC, a ASC])
                                  +- Exchange(distribution=[forward])
-                                    +- OverAggregate(orderBy=[b ASC], 
window#0=[COUNT(a) AS w0$o2, $SUM0(a) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0])
+                                    +- OverAggregate(orderBy=[b ASC], 
window#0=[COUNT(a) AS w0$o2, $SUM0(a) AS w1$o0 RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o2, w1$o0])
                                        +- Sort(orderBy=[b ASC])
                                           +- Exchange(distribution=[single])
                                              +- 
TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c])
@@ -111,11 +111,11 @@ LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION 
BY $1 ORDER BY $0 NULLS
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS EXPR$0, w1$o0 AS 
EXPR$1])
-+- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[MAX(a) AS w1$o0 
RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0, 
w0$o1, w1$o0])
++- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[MAX(a) AS w1$o0 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0, 
w0$o1, w1$o0])
    +- Exchange(distribution=[forward])
       +- Sort(orderBy=[c ASC, a ASC])
          +- Exchange(distribution=[hash[c]])
-            +- OverAggregate(partitionBy=[b], orderBy=[a ASC], 
window#0=[COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW], select=[a, b, c, w0$o0, w0$o1])
+            +- OverAggregate(partitionBy=[b], orderBy=[a ASC], 
window#0=[COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1 RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0, w0$o1])
                +- Exchange(distribution=[forward])
                   +- Sort(orderBy=[b ASC, a ASC])
                      +- Exchange(distribution=[hash[b]])
@@ -146,7 +146,7 @@ LogicalProject(EXPR$0=[COUNT() OVER (PARTITION BY $2 ORDER 
BY $0 NULLS FIRST RAN
 Calc(select=[w0$o0 AS EXPR$0, CASE((w1$o0 > 0), w1$o1, null:INTEGER) AS 
EXPR$1, w2$o0 AS EXPR$2, CASE((w3$o0 > 0), w3$o1, null:INTEGER) AS EXPR$3, 
w4$o0 AS EXPR$4])
 +- OverAggregate(partitionBy=[c], orderBy=[c ASC], window#0=[COUNT(*) AS w4$o0 
ROWS BETWEEN 1 PRECEDING AND 10 FOLLOWING], select=[a, c, w0$o0, w1$o0, w1$o1, 
w3$o1, w2$o0, w3$o0, w4$o0])
    +- Exchange(distribution=[forward])
-      +- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) AS 
w0$o0 RANG BETWEEN -1 PRECEDING AND 10 FOLLOWING], window#1=[COUNT(a) AS w1$o0, 
$SUM0(a) AS w1$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
window#2=[COUNT(a) AS w3$o1, $SUM0(a) AS w2$o0 RANG BETWEEN 1 PRECEDING AND 10 
FOLLOWING], window#3=[RANK(*) AS w3$o0 RANG BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW], select=[a, c, w0$o0, w1$o0, w1$o1, w3$o1, w2$o0, w3$o0])
+      +- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) AS 
w0$o0 RANGE BETWEEN -1 PRECEDING AND 10 FOLLOWING], window#1=[COUNT(a) AS 
w1$o0, $SUM0(a) AS w1$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
window#2=[COUNT(a) AS w3$o1, $SUM0(a) AS w2$o0 RANGE BETWEEN 1 PRECEDING AND 10 
FOLLOWING], window#3=[RANK(*) AS w3$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW], select=[a, c, w0$o0, w1$o0, w1$o1, w3$o1, w2$o0, w3$o0])
          +- Exchange(distribution=[forward])
             +- Sort(orderBy=[c ASC, a ASC])
                +- Exchange(distribution=[hash[c]])
@@ -250,7 +250,7 @@ LogicalProject(EXPR$0=[COUNT() OVER (PARTITION BY $2 ORDER 
BY $0 NULLS FIRST RAN
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[w0$o0 AS $0])
-+- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) AS w0$o0 
RANG BETWEEN -1 PRECEDING AND 10 FOLLOWING], select=[a, c, w0$o0])
++- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) AS w0$o0 
RANGE BETWEEN -1 PRECEDING AND 10 FOLLOWING], select=[a, c, w0$o0])
    +- Exchange(distribution=[forward])
       +- Sort(orderBy=[c ASC, a ASC])
          +- Exchange(distribution=[hash[c]])
@@ -271,7 +271,7 @@ LogicalProject(c=[$2], EXPR$1=[COUNT() OVER (PARTITION BY 
$2)])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-OverAggregate(partitionBy=[c], window#0=[COUNT(*) AS w0$o0 RANG BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[c, w0$o0])
+OverAggregate(partitionBy=[c], window#0=[COUNT(*) AS w0$o0 RANGE BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[c, w0$o0])
 +- Exchange(distribution=[forward])
    +- Sort(orderBy=[c ASC])
       +- Exchange(distribution=[hash[c]])
@@ -293,7 +293,7 @@ LogicalProject(a=[$0], EXPR$1=[DENSE_RANK() OVER (PARTITION 
BY $0 ORDER BY $3 NU
                <Resource name="optimized exec plan">
                        <![CDATA[
 Calc(select=[a, w0$o0 AS $1])
-+- OverAggregate(partitionBy=[a], orderBy=[proctime ASC], 
window#0=[DENSE_RANK(*) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW], select=[a, proctime, w0$o0])
++- OverAggregate(partitionBy=[a], orderBy=[proctime ASC], 
window#0=[DENSE_RANK(*) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW], select=[a, proctime, w0$o0])
    +- Exchange(distribution=[forward])
       +- Sort(orderBy=[a ASC, proctime ASC])
          +- Exchange(distribution=[hash[a]])
@@ -315,7 +315,7 @@ LogicalProject(a=[$0], EXPR$1=[RANK() OVER (PARTITION BY $0 
ORDER BY $3 NULLS FI
                <Resource name="optimized exec plan">
                        <![CDATA[
 Calc(select=[a, w0$o0 AS $1])
-+- OverAggregate(partitionBy=[a], orderBy=[proctime ASC], window#0=[RANK(*) AS 
w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, proctime, 
w0$o0])
++- OverAggregate(partitionBy=[a], orderBy=[proctime ASC], window#0=[RANK(*) AS 
w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, proctime, 
w0$o0])
    +- Exchange(distribution=[forward])
       +- Sort(orderBy=[a ASC, proctime ASC])
          +- Exchange(distribution=[hash[a]])
@@ -337,7 +337,7 @@ LogicalProject(c=[$2], EXPR$1=[CASE(>(COUNT($0) OVER (ORDER 
BY $1 NULLS FIRST),
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS EXPR$1])
-+- OverAggregate(orderBy=[b ASC], window#0=[COUNT(a) AS w0$o0, $SUM0(a) AS 
w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, 
w0$o0, w0$o1])
++- OverAggregate(orderBy=[b ASC], window#0=[COUNT(a) AS w0$o0, $SUM0(a) AS 
w0$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, 
w0$o0, w0$o1])
    +- Sort(orderBy=[b ASC])
       +- Exchange(distribution=[single])
          +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c])
@@ -356,7 +356,7 @@ LogicalProject(c=[$2], EXPR$1=[COUNT() OVER ()])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-OverAggregate(window#0=[COUNT(*) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING], select=[c, w0$o0])
+OverAggregate(window#0=[COUNT(*) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING 
AND UNBOUNDED FOLLOWING], select=[c, w0$o0])
 +- Exchange(distribution=[single])
    +- Calc(select=[c])
       +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c])
@@ -380,7 +380,7 @@ LogicalProject(EXPR$0=[COUNT() OVER (PARTITION BY $2 ORDER 
BY $0 NULLS FIRST RAN
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[w0$o0 AS $0])
-+- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) AS w0$o0 
RANG BETWEEN -1 PRECEDING AND 10 FOLLOWING], select=[a, c, w0$o0])
++- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) AS w0$o0 
RANGE BETWEEN -1 PRECEDING AND 10 FOLLOWING], select=[a, c, w0$o0])
    +- Exchange(distribution=[forward])
       +- Sort(orderBy=[c ASC, a ASC])
          +- Exchange(distribution=[hash[c]])
@@ -406,7 +406,7 @@ LogicalProject(EXPR$0=[COUNT() OVER (PARTITION BY $2 ORDER 
BY $0 NULLS FIRST RAN
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[w0$o0 AS $0])
-+- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) AS w0$o0 
RANG BETWEEN -1 FOLLOWING AND 10 FOLLOWING], select=[a, c, w0$o0])
++- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(*) AS w0$o0 
RANGE BETWEEN -1 FOLLOWING AND 10 FOLLOWING], select=[a, c, w0$o0])
    +- Exchange(distribution=[forward])
       +- Sort(orderBy=[c ASC, a ASC])
          +- Exchange(distribution=[hash[c]])
@@ -428,7 +428,7 @@ LogicalProject(EXPR$0=[RANK() OVER (PARTITION BY $2 ORDER 
BY $0 NULLS FIRST, $2
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[w0$o0 AS $0])
-+- OverAggregate(partitionBy=[c], orderBy=[a ASC, c ASC], window#0=[RANK(*) AS 
w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, w0$o0])
++- OverAggregate(partitionBy=[c], orderBy=[a ASC, c ASC], window#0=[RANK(*) AS 
w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, w0$o0])
    +- Exchange(distribution=[forward])
       +- Sort(orderBy=[c ASC, a ASC])
          +- Exchange(distribution=[hash[c]])
@@ -449,7 +449,7 @@ LogicalProject(EXPR$0=[COUNT(1) OVER ()])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-OverAggregate(window#0=[COUNT(1) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING], select=[w0$o0])
+OverAggregate(window#0=[COUNT(1) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING 
AND UNBOUNDED FOLLOWING], select=[w0$o0])
 +- Exchange(distribution=[single])
    +- Calc(select=[])
       +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c])
@@ -474,7 +474,7 @@ LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY 
$2 ORDER BY $0 NULLS
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS EXPR$0, w0$o2 AS 
EXPR$1])
-+- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(a) AS 
w0$o0, $SUM0(a) AS w0$o1, COUNT(1) AS w0$o2 RANG BETWEEN -1 FOLLOWING AND 10 
FOLLOWING], select=[a, c, w0$o0, w0$o1, w0$o2])
++- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(a) AS 
w0$o0, $SUM0(a) AS w0$o1, COUNT(1) AS w0$o2 RANGE BETWEEN -1 FOLLOWING AND 10 
FOLLOWING], select=[a, c, w0$o0, w0$o1, w0$o2])
    +- Exchange(distribution=[forward])
       +- Sort(orderBy=[c ASC, a ASC])
          +- Exchange(distribution=[hash[c]])
@@ -501,11 +501,11 @@ LogicalProject(EXPR$0=[COUNT(2) OVER (ORDER BY $0 NULLS 
FIRST)], EXPR$1=[COUNT(1
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[w0$o0 AS $0, w1$o0 AS $1])
-+- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(1) AS w1$o0 
RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, w0$o0, w1$o0])
++- OverAggregate(partitionBy=[c], orderBy=[a ASC], window#0=[COUNT(1) AS w1$o0 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, w0$o0, w1$o0])
    +- Exchange(distribution=[forward])
       +- Sort(orderBy=[c ASC, a ASC])
          +- Exchange(distribution=[hash[c]])
-            +- OverAggregate(orderBy=[a ASC], window#0=[COUNT(2) AS w0$o0 RANG 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, w0$o0])
+            +- OverAggregate(orderBy=[a ASC], window#0=[COUNT(2) AS w0$o0 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, w0$o0])
                +- Sort(orderBy=[a ASC])
                   +- Exchange(distribution=[single])
                      +- Calc(select=[a, c])
@@ -534,13 +534,13 @@ LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION 
BY $1 ORDER BY $2 NULLS
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS EXPR$0, w1$o0 AS 
EXPR$1, CAST((CASE((w2$o0 > 0), w2$o1, null:INTEGER) / w2$o0) AS INTEGER) AS 
EXPR$2, w0$o2 AS EXPR$3, w1$o1 AS EXPR$4])
-+- OverAggregate(partitionBy=[b], orderBy=[c ASC], window#0=[COUNT(a) AS 
w2$o0, $SUM0(a) AS w2$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
window#1=[RANK(*) AS w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
select=[a, b, c, w0$o1, w0$o2, w1$o1, w0$o0, w2$o0, w2$o1, w1$o0])
++- OverAggregate(partitionBy=[b], orderBy=[c ASC], window#0=[COUNT(a) AS 
w2$o0, $SUM0(a) AS w2$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
window#1=[RANK(*) AS w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
select=[a, b, c, w0$o1, w0$o2, w1$o1, w0$o0, w2$o0, w2$o1, w1$o0])
    +- Exchange(distribution=[forward])
       +- Sort(orderBy=[b ASC, c ASC])
          +- Exchange(distribution=[keep_input_as_is[hash[b]]])
-            +- OverAggregate(partitionBy=[b], orderBy=[b ASC], 
window#0=[MAX(a) AS w1$o1, MIN(a) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW], select=[a, b, c, w0$o1, w0$o2, w1$o1, w0$o0])
+            +- OverAggregate(partitionBy=[b], orderBy=[b ASC], 
window#0=[MAX(a) AS w1$o1, MIN(a) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW], select=[a, b, c, w0$o1, w0$o2, w1$o1, w0$o0])
                +- Exchange(distribution=[forward])
-                  +- OverAggregate(partitionBy=[b], orderBy=[a ASC], 
window#0=[COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2 RANG BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW], select=[a, b, c, w0$o1, w0$o2])
+                  +- OverAggregate(partitionBy=[b], orderBy=[a ASC], 
window#0=[COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2 RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o1, w0$o2])
                      +- Exchange(distribution=[forward])
                         +- Sort(orderBy=[b ASC, a ASC])
                            +- Exchange(distribution=[hash[b]])
@@ -569,17 +569,17 @@ LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION 
BY $1 ORDER BY $2 NULLS
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS EXPR$0, w1$o0 AS 
EXPR$1, CAST((CASE((w2$o0 > 0), w2$o1, null:INTEGER) / w2$o0) AS INTEGER) AS 
EXPR$2, w3$o0 AS EXPR$3, w4$o0 AS EXPR$4])
-+- OverAggregate(partitionBy=[b], orderBy=[c ASC], window#0=[COUNT(a) AS 
w1$o0, $SUM0(a) AS w3$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
select=[a, b, c, w2$o0, w0$o0, w2$o1, w4$o0, w0$o1, w1$o0, w3$o0])
++- OverAggregate(partitionBy=[b], orderBy=[c ASC], window#0=[COUNT(a) AS 
w1$o0, $SUM0(a) AS w3$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
select=[a, b, c, w2$o0, w0$o0, w2$o1, w4$o0, w0$o1, w1$o0, w3$o0])
    +- Exchange(distribution=[forward])
       +- Sort(orderBy=[b ASC, c ASC])
          +- Exchange(distribution=[keep_input_as_is[hash[b]]])
-            +- OverAggregate(partitionBy=[b], orderBy=[b ASC], 
window#0=[MIN(a) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
select=[a, b, c, w2$o0, w0$o0, w2$o1, w4$o0, w0$o1])
+            +- OverAggregate(partitionBy=[b], orderBy=[b ASC], 
window#0=[MIN(a) AS w0$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
select=[a, b, c, w2$o0, w0$o0, w2$o1, w4$o0, w0$o1])
                +- Exchange(distribution=[forward])
-                  +- OverAggregate(partitionBy=[b], orderBy=[a ASC, c ASC], 
window#0=[COUNT(a) AS w2$o1, $SUM0(a) AS w4$o0 RANG BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW], select=[a, b, c, w2$o0, w0$o0, w2$o1, w4$o0])
+                  +- OverAggregate(partitionBy=[b], orderBy=[a ASC, c ASC], 
window#0=[COUNT(a) AS w2$o1, $SUM0(a) AS w4$o0 RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW], select=[a, b, c, w2$o0, w0$o0, w2$o1, w4$o0])
                      +- Exchange(distribution=[forward])
                         +- Sort(orderBy=[b ASC, a ASC, c ASC])
                            +- 
Exchange(distribution=[keep_input_as_is[hash[b]]])
-                              +- OverAggregate(partitionBy=[b], orderBy=[a 
ASC], window#0=[MAX(a) AS w2$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW], window#1=[RANK(*) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW], select=[a, b, c, w2$o0, w0$o0])
+                              +- OverAggregate(partitionBy=[b], orderBy=[a 
ASC], window#0=[MAX(a) AS w2$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW], window#1=[RANK(*) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW], select=[a, b, c, w2$o0, w0$o0])
                                  +- Exchange(distribution=[forward])
                                     +- Sort(orderBy=[b ASC, a ASC])
                                        +- Exchange(distribution=[hash[b]])
@@ -605,7 +605,7 @@ LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION BY 
$1 ORDER BY $0 NULLS
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS EXPR$0, w0$o2 AS 
EXPR$1])
-+- OverAggregate(partitionBy=[b], orderBy=[a ASC], window#0=[COUNT(a) AS 
w0$o0, $SUM0(a) AS w0$o1, MAX(a) AS w0$o2 RANG BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW], select=[a, b, w0$o0, w0$o1, w0$o2])
++- OverAggregate(partitionBy=[b], orderBy=[a ASC], window#0=[COUNT(a) AS 
w0$o0, $SUM0(a) AS w0$o1, MAX(a) AS w0$o2 RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW], select=[a, b, w0$o0, w0$o1, w0$o2])
    +- Exchange(distribution=[forward])
       +- Sort(orderBy=[b ASC, a ASC])
          +- Exchange(distribution=[hash[b]])
@@ -635,11 +635,11 @@ LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION 
BY $1 ORDER BY $0 NULLS
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS EXPR$0, w0$o2 AS 
EXPR$1, CAST((CASE((w1$o0 > 0), w1$o1, null:INTEGER) / w1$o0) AS INTEGER) AS 
EXPR$2, w0$o3 AS EXPR$3, w1$o2 AS EXPR$4])
-+- OverAggregate(partitionBy=[b], orderBy=[a DESC], window#0=[COUNT(a) AS 
w1$o0, $SUM0(a) AS w1$o1, MIN(a) AS w1$o2 RANG BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW], select=[a, b, w0$o0, w0$o1, w0$o2, w0$o3, w1$o0, w1$o1, w1$o2])
++- OverAggregate(partitionBy=[b], orderBy=[a DESC], window#0=[COUNT(a) AS 
w1$o0, $SUM0(a) AS w1$o1, MIN(a) AS w1$o2 RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW], select=[a, b, w0$o0, w0$o1, w0$o2, w0$o3, w1$o0, w1$o1, w1$o2])
    +- Exchange(distribution=[forward])
       +- Sort(orderBy=[b ASC, a DESC])
          +- Exchange(distribution=[keep_input_as_is[hash[b]]])
-            +- OverAggregate(partitionBy=[b], orderBy=[a ASC], 
window#0=[COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1, MAX(a) AS w0$o2 RANG BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW], window#1=[RANK(*) AS w0$o3 RANG BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, w0$o0, w0$o1, w0$o2, w0$o3])
+            +- OverAggregate(partitionBy=[b], orderBy=[a ASC], 
window#0=[COUNT(a) AS w0$o0, $SUM0(a) AS w0$o1, MAX(a) AS w0$o2 RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW], window#1=[RANK(*) AS w0$o3 RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, w0$o0, w0$o1, w0$o2, w0$o3])
                +- Exchange(distribution=[forward])
                   +- Sort(orderBy=[b ASC, a ASC])
                      +- Exchange(distribution=[hash[b]])
@@ -666,11 +666,11 @@ LogicalProject(EXPR$0=[RANK() OVER (PARTITION BY $1 ORDER 
BY $0 DESC NULLS LAST)
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[w0$o0 AS $0, w1$o0 AS $1])
-+- OverAggregate(partitionBy=[b], orderBy=[a DESC], window#0=[RANK(*) AS w1$o0 
RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0, 
w1$o0])
++- OverAggregate(partitionBy=[b], orderBy=[a DESC], window#0=[RANK(*) AS w1$o0 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, w0$o0, 
w1$o0])
    +- Exchange(distribution=[forward])
       +- Sort(orderBy=[b ASC, a DESC])
          +- Exchange(distribution=[keep_input_as_is[hash[b]]])
-            +- OverAggregate(partitionBy=[b], orderBy=[a ASC], 
window#0=[RANK(*) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
select=[a, b, c, w0$o0])
+            +- OverAggregate(partitionBy=[b], orderBy=[a ASC], 
window#0=[RANK(*) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
select=[a, b, c, w0$o0])
                +- Exchange(distribution=[forward])
                   +- Sort(orderBy=[b ASC, a ASC])
                      +- Exchange(distribution=[hash[b]])
@@ -697,11 +697,11 @@ LogicalProject(EXPR$0=[CASE(>(COUNT($0) OVER (PARTITION 
BY $1 ORDER BY $0 NULLS
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS EXPR$0, w1$o0 AS 
EXPR$1, w0$o2 AS EXPR$2])
-+- OverAggregate(partitionBy=[b], orderBy=[a ASC], window#0=[COUNT(a) AS 
w1$o0, $SUM0(a) AS w0$o0, MAX(a) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW], select=[a, b, w0$o2, w1$o0, w0$o0, w0$o1])
++- OverAggregate(partitionBy=[b], orderBy=[a ASC], window#0=[COUNT(a) AS 
w1$o0, $SUM0(a) AS w0$o0, MAX(a) AS w0$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW], select=[a, b, w0$o2, w1$o0, w0$o0, w0$o1])
    +- Exchange(distribution=[forward])
       +- Sort(orderBy=[b ASC, a ASC])
          +- Exchange(distribution=[keep_input_as_is[hash[b]]])
-            +- OverAggregate(partitionBy=[b], window#0=[MIN(a) AS w0$o2 RANG 
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[a, b, w0$o2])
+            +- OverAggregate(partitionBy=[b], window#0=[MIN(a) AS w0$o2 RANGE 
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[a, b, w0$o2])
                +- Exchange(distribution=[forward])
                   +- Sort(orderBy=[b ASC])
                      +- Exchange(distribution=[hash[b]])
@@ -728,9 +728,9 @@ LogicalProject(a=[$0], EXPR$1=[RANK() OVER (PARTITION BY $1 
ORDER BY $2 NULLS FI
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, w0$o0 AS $1, w1$o0 AS $2])
-+- OverAggregate(partitionBy=[b], orderBy=[c ASC, b ASC], window#0=[RANK(*) AS 
w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, 
w0$o0, w1$o0])
++- OverAggregate(partitionBy=[b], orderBy=[c ASC, b ASC], window#0=[RANK(*) AS 
w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, 
w0$o0, w1$o0])
    +- Exchange(distribution=[forward])
-      +- OverAggregate(partitionBy=[b], orderBy=[c ASC, a DESC], 
window#0=[RANK(*) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
select=[a, b, c, w0$o0])
+      +- OverAggregate(partitionBy=[b], orderBy=[c ASC, a DESC], 
window#0=[RANK(*) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
select=[a, b, c, w0$o0])
          +- Exchange(distribution=[forward])
             +- Sort(orderBy=[b ASC, c ASC, a DESC])
                +- Exchange(distribution=[hash[b]])
@@ -763,11 +763,11 @@ LogicalProject(a=[$0], b=[$1], ts=[$2], c1=[$3], c2=[$4])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-OverAggregate(partitionBy=[a], orderBy=[ts ASC], window#0=[COUNT(*) AS w0$o0_0 
RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, ts, w0$o0, 
w0$o0_0])
+OverAggregate(partitionBy=[a], orderBy=[ts ASC], window#0=[COUNT(*) AS w0$o0_0 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, ts, w0$o0, 
w0$o0_0])
 +- Exchange(distribution=[forward])
    +- Sort(orderBy=[a ASC, ts ASC])
       +- Exchange(distribution=[hash[a]])
-         +- OverAggregate(partitionBy=[a, b], orderBy=[ts ASC], 
window#0=[COUNT(*) AS w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
select=[a, b, ts, w0$o0])
+         +- OverAggregate(partitionBy=[a, b], orderBy=[ts ASC], 
window#0=[COUNT(*) AS w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], 
select=[a, b, ts, w0$o0])
             +- Exchange(distribution=[forward])
                +- Sort(orderBy=[a ASC, b ASC, ts ASC])
                   +- Exchange(distribution=[hash[a, b]])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.xml
index 0f8b17670d4..f613399ae95 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/BroadcastHashSemiAntiJoinTest.xml
@@ -735,7 +735,7 @@ HashJoin(joinType=[LeftSemiJoin], where=[((b = rk) AND (a 
<> d))], select=[a, b,
 :- TableSourceScan(table=[[default_catalog, default_database, l]], fields=[a, 
b, c])
 +- Exchange(distribution=[broadcast])
    +- Calc(select=[w0$o0 AS rk, d])
-      +- OverAggregate(partitionBy=[d], orderBy=[e ASC], window#0=[RANK(*) AS 
w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, w0$o0])
+      +- OverAggregate(partitionBy=[d], orderBy=[e ASC], window#0=[RANK(*) AS 
w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, w0$o0])
          +- Exchange(distribution=[forward])
             +- Sort(orderBy=[d ASC, e ASC])
                +- Exchange(distribution=[hash[d]])
@@ -1167,11 +1167,11 @@ HashJoin(joinType=[LeftSemiJoin], where=[((a = $0) AND 
(b = $1))], select=[a, b,
 :- TableSourceScan(table=[[default_catalog, default_database, l]], fields=[a, 
b, c])
 +- Exchange(distribution=[broadcast])
    +- Calc(select=[w0$o0 AS $0, w1$o0 AS $1])
-      +- OverAggregate(partitionBy=[f], orderBy=[d ASC], window#0=[MIN(e) AS 
w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, f, 
w0$o0, w1$o0])
+      +- OverAggregate(partitionBy=[f], orderBy=[d ASC], window#0=[MIN(e) AS 
w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, f, 
w0$o0, w1$o0])
          +- Exchange(distribution=[forward])
             +- Sort(orderBy=[f ASC, d ASC])
                +- Exchange(distribution=[hash[f]])
-                  +- OverAggregate(window#0=[MAX(d) AS w0$o0 RANG BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[d, e, f, w0$o0])
+                  +- OverAggregate(window#0=[MAX(d) AS w0$o0 RANGE BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[d, e, f, w0$o0])
                      +- Exchange(distribution=[single])
                         +- TableSourceScan(table=[[default_catalog, 
default_database, r]], fields=[d, e, f])
 ]]>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml
index 094c0b1f4f2..fed6b988a93 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/NestedLoopSemiAntiJoinTest.xml
@@ -917,7 +917,7 @@ NestedLoopJoin(joinType=[LeftSemiJoin], where=[((b = rk) 
AND (a <> d))], select=
 :- TableSourceScan(table=[[default_catalog, default_database, l]], fields=[a, 
b, c])
 +- Exchange(distribution=[broadcast])
    +- Calc(select=[w0$o0 AS rk, d])
-      +- OverAggregate(partitionBy=[d], orderBy=[e ASC], window#0=[RANK(*) AS 
w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, w0$o0])
+      +- OverAggregate(partitionBy=[d], orderBy=[e ASC], window#0=[RANK(*) AS 
w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, w0$o0])
          +- Exchange(distribution=[forward])
             +- Sort(orderBy=[d ASC, e ASC])
                +- Exchange(distribution=[hash[d]])
@@ -1434,11 +1434,11 @@ NestedLoopJoin(joinType=[LeftSemiJoin], where=[((a = 
$0) AND (b = $1))], select=
 :- TableSourceScan(table=[[default_catalog, default_database, l]], fields=[a, 
b, c])
 +- Exchange(distribution=[broadcast])
    +- Calc(select=[w0$o0 AS $0, w1$o0 AS $1])
-      +- OverAggregate(partitionBy=[f], orderBy=[d ASC], window#0=[MIN(e) AS 
w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, f, 
w0$o0, w1$o0])
+      +- OverAggregate(partitionBy=[f], orderBy=[d ASC], window#0=[MIN(e) AS 
w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, f, 
w0$o0, w1$o0])
          +- Exchange(distribution=[forward])
             +- Sort(orderBy=[f ASC, d ASC])
                +- Exchange(distribution=[hash[f]])
-                  +- OverAggregate(window#0=[MAX(d) AS w0$o0 RANG BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[d, e, f, w0$o0])
+                  +- OverAggregate(window#0=[MAX(d) AS w0$o0 RANGE BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[d, e, f, w0$o0])
                      +- Exchange(distribution=[single])
                         +- TableSourceScan(table=[[default_catalog, 
default_database, r]], fields=[d, e, f])
 ]]>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SemiAntiJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SemiAntiJoinTest.xml
index ec20c141adf..a975f2bd917 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SemiAntiJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SemiAntiJoinTest.xml
@@ -950,7 +950,7 @@ HashJoin(joinType=[LeftSemiJoin], where=[((b = rk) AND (a 
<> d))], select=[a, b,
 :  +- TableSourceScan(table=[[default_catalog, default_database, l]], 
fields=[a, b, c])
 +- Exchange(distribution=[hash[rk]])
    +- Calc(select=[w0$o0 AS rk, d])
-      +- OverAggregate(partitionBy=[d], orderBy=[e ASC], window#0=[RANK(*) AS 
w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, w0$o0])
+      +- OverAggregate(partitionBy=[d], orderBy=[e ASC], window#0=[RANK(*) AS 
w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, w0$o0])
          +- Exchange(distribution=[forward])
             +- Sort(orderBy=[d ASC, e ASC])
                +- Exchange(distribution=[hash[d]])
@@ -1488,11 +1488,11 @@ HashJoin(joinType=[LeftSemiJoin], where=[((a = $0) AND 
(b = $1))], select=[a, b,
 :  +- TableSourceScan(table=[[default_catalog, default_database, l]], 
fields=[a, b, c])
 +- Exchange(distribution=[hash[$0, $1]])
    +- Calc(select=[w0$o0 AS $0, w1$o0 AS $1])
-      +- OverAggregate(partitionBy=[f], orderBy=[d ASC], window#0=[MIN(e) AS 
w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, f, 
w0$o0, w1$o0])
+      +- OverAggregate(partitionBy=[f], orderBy=[d ASC], window#0=[MIN(e) AS 
w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, f, 
w0$o0, w1$o0])
          +- Exchange(distribution=[forward])
             +- Sort(orderBy=[f ASC, d ASC])
                +- Exchange(distribution=[hash[f]])
-                  +- OverAggregate(window#0=[MAX(d) AS w0$o0 RANG BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[d, e, f, w0$o0])
+                  +- OverAggregate(window#0=[MAX(d) AS w0$o0 RANGE BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[d, e, f, w0$o0])
                      +- Exchange(distribution=[single])
                         +- TableSourceScan(table=[[default_catalog, 
default_database, r]], fields=[d, e, f])
 ]]>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.xml
index 8f29cd63d08..00927a6bb00 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/ShuffledHashSemiAntiJoinTest.xml
@@ -800,7 +800,7 @@ HashJoin(joinType=[LeftSemiJoin], where=[((b = rk) AND (a 
<> d))], select=[a, b,
 :  +- TableSourceScan(table=[[default_catalog, default_database, l]], 
fields=[a, b, c])
 +- Exchange(distribution=[hash[rk]])
    +- Calc(select=[w0$o0 AS rk, d])
-      +- OverAggregate(partitionBy=[d], orderBy=[e ASC], window#0=[RANK(*) AS 
w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, w0$o0])
+      +- OverAggregate(partitionBy=[d], orderBy=[e ASC], window#0=[RANK(*) AS 
w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, w0$o0])
          +- Exchange(distribution=[forward])
             +- Sort(orderBy=[d ASC, e ASC])
                +- Exchange(distribution=[hash[d]])
@@ -1291,11 +1291,11 @@ HashJoin(joinType=[LeftSemiJoin], where=[((a = $0) AND 
(b = $1))], select=[a, b,
 :  +- TableSourceScan(table=[[default_catalog, default_database, l]], 
fields=[a, b, c])
 +- Exchange(distribution=[hash[$0, $1]])
    +- Calc(select=[w0$o0 AS $0, w1$o0 AS $1])
-      +- OverAggregate(partitionBy=[f], orderBy=[d ASC], window#0=[MIN(e) AS 
w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, f, 
w0$o0, w1$o0])
+      +- OverAggregate(partitionBy=[f], orderBy=[d ASC], window#0=[MIN(e) AS 
w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, f, 
w0$o0, w1$o0])
          +- Exchange(distribution=[forward])
             +- Sort(orderBy=[f ASC, d ASC])
                +- Exchange(distribution=[hash[f]])
-                  +- OverAggregate(window#0=[MAX(d) AS w0$o0 RANG BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[d, e, f, w0$o0])
+                  +- OverAggregate(window#0=[MAX(d) AS w0$o0 RANGE BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[d, e, f, w0$o0])
                      +- Exchange(distribution=[single])
                         +- TableSourceScan(table=[[default_catalog, 
default_database, r]], fields=[d, e, f])
 ]]>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeSemiAntiJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeSemiAntiJoinTest.xml
index fd85c5db67a..17b7e1066d9 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeSemiAntiJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/join/SortMergeSemiAntiJoinTest.xml
@@ -800,7 +800,7 @@ SortMergeJoin(joinType=[LeftSemiJoin], where=[((b = rk) AND 
(a <> d))], select=[
 :  +- TableSourceScan(table=[[default_catalog, default_database, l]], 
fields=[a, b, c])
 +- Exchange(distribution=[hash[rk]])
    +- Calc(select=[w0$o0 AS rk, d])
-      +- OverAggregate(partitionBy=[d], orderBy=[e ASC], window#0=[RANK(*) AS 
w0$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, w0$o0])
+      +- OverAggregate(partitionBy=[d], orderBy=[e ASC], window#0=[RANK(*) AS 
w0$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, w0$o0])
          +- Exchange(distribution=[forward])
             +- Sort(orderBy=[d ASC, e ASC])
                +- Exchange(distribution=[hash[d]])
@@ -1291,11 +1291,11 @@ SortMergeJoin(joinType=[LeftSemiJoin], where=[((a = $0) 
AND (b = $1))], select=[
 :  +- TableSourceScan(table=[[default_catalog, default_database, l]], 
fields=[a, b, c])
 +- Exchange(distribution=[hash[$0, $1]])
    +- Calc(select=[w0$o0 AS $0, w1$o0 AS $1])
-      +- OverAggregate(partitionBy=[f], orderBy=[d ASC], window#0=[MIN(e) AS 
w1$o0 RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, f, 
w0$o0, w1$o0])
+      +- OverAggregate(partitionBy=[f], orderBy=[d ASC], window#0=[MIN(e) AS 
w1$o0 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, f, 
w0$o0, w1$o0])
          +- Exchange(distribution=[forward])
             +- Sort(orderBy=[f ASC, d ASC])
                +- Exchange(distribution=[hash[f]])
-                  +- OverAggregate(window#0=[MAX(d) AS w0$o0 RANG BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[d, e, f, w0$o0])
+                  +- OverAggregate(window#0=[MAX(d) AS w0$o0 RANGE BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], select=[d, e, f, w0$o0])
                      +- Exchange(distribution=[single])
                         +- TableSourceScan(table=[[default_catalog, 
default_database, r]], fields=[d, e, f])
 ]]>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/PythonOverWindowAggregateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/PythonOverWindowAggregateTest.xml
index db2295d8a93..ceb66b66085 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/PythonOverWindowAggregateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/table/PythonOverWindowAggregateTest.xml
@@ -26,7 +26,7 @@ LogicalProject(b=[$1], 
_c1=[AS(*org.apache.flink.table.planner.runtime.utils.Jav
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[b, w0$o0 AS _c1])
-+- PythonOverAggregate(partitionBy=[b], orderBy=[rowtime ASC], 
window#0=[PandasAggregateFunction(a, c) AS w0$o0 RANG BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW], select=[a, b, c, rowtime, w0$o0])
++- PythonOverAggregate(partitionBy=[b], orderBy=[rowtime ASC], 
window#0=[PandasAggregateFunction(a, c) AS w0$o0 RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW], select=[a, b, c, rowtime, w0$o0])
    +- Exchange(distribution=[forward])
       +- Sort(orderBy=[b ASC, rowtime ASC])
          +- Exchange(distribution=[hash[b]])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out
index bea3a1febca..6993dc3860e 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedNonPartitionedRangeOver.out
@@ -327,7 +327,7 @@
         "fieldType" : "BIGINT"
       } ]
     },
-    "description" : "PythonOverAggregate(orderBy=[proctime ASC], window=[ RANG 
BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[c, proctime, $2, pyFunc(c, c) 
AS w0$o0])"
+    "description" : "PythonOverAggregate(orderBy=[proctime ASC], window=[ 
RANGE BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[c, proctime, $2, 
pyFunc(c, c) AS w0$o0])"
   }, {
     "id" : 7,
     "type" : "stream-exec-calc_1",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out
index 43e9007f0a4..85933d1df57 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeBoundedPartitionedRangeOver.out
@@ -341,7 +341,7 @@
         "fieldType" : "BIGINT"
       } ]
     },
-    "description" : "PythonOverAggregate(partitionBy=[a], orderBy=[proctime 
ASC], window=[ RANG BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[a, c, 
proctime, $3, pyFunc(c, c) AS w0$o0])"
+    "description" : "PythonOverAggregate(partitionBy=[a], orderBy=[proctime 
ASC], window=[ RANGE BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[a, c, 
proctime, $3, pyFunc(c, c) AS w0$o0])"
   }, {
     "id" : 7,
     "type" : "stream-exec-calc_1",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out
index 51e3cd8bdc5..4480abaf70f 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonOverAggregateJsonPlanTest_jsonplan/testProcTimeUnboundedPartitionedRangeOver.out
@@ -331,7 +331,7 @@
         "fieldType" : "BIGINT"
       } ]
     },
-    "description" : "PythonOverAggregate(partitionBy=[a], orderBy=[proctime 
ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, 
proctime, $3, pyFunc(c, c) AS w0$o0])"
+    "description" : "PythonOverAggregate(partitionBy=[a], orderBy=[proctime 
ASC], window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, 
c, proctime, $3, pyFunc(c, c) AS w0$o0])"
   }, {
     "id" : 7,
     "type" : "stream-exec-calc_1",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
index 00ec6819914..0c60d04880e 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/batch/PushLocalAggIntoTableSourceScanRuleTest.xml
@@ -223,7 +223,7 @@ LogicalProject(id=[$0], amount=[$2], 
EXPR$2=[CASE(>(COUNT($3) OVER (PARTITION BY
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[id, amount, CASE(>(w0$o0, 0), w0$o1, null:BIGINT) AS EXPR$2, 
name])
-+- OverAggregate(partitionBy=[name], window#0=[COUNT(price) AS w0$o0, 
$SUM0(price) AS w0$o1 RANG BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING], select=[id, name, amount, price, w0$o0, w0$o1])
++- OverAggregate(partitionBy=[name], window#0=[COUNT(price) AS w0$o0, 
$SUM0(price) AS w0$o1 RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING], select=[id, name, amount, price, w0$o0, w0$o1])
    +- Sort(orderBy=[name ASC])
       +- Exchange(distribution=[hash[name]])
          +- TableSourceScan(table=[[default_catalog, default_database, 
inventory, project=[id, name, amount, price], metadata=[]]], fields=[id, name, 
amount, price])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MatchRecognizeTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MatchRecognizeTest.xml
index 1877061f413..3331876c2bd 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MatchRecognizeTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MatchRecognizeTest.xml
@@ -152,7 +152,7 @@ LogicalProject(symbol=[$0], price=[$1], tax=[$2], 
matchRowtime=[$3], price_sum=[
     <Resource name="optimized rel plan">
       <![CDATA[
 Calc(select=[symbol, price, tax, matchRowtime, CASE(>(w0$o0, 0), w0$o1, 
null:INTEGER) AS price_sum]), rowType=[RecordType(VARCHAR(2147483647) symbol, 
INTEGER price, INTEGER tax, TIMESTAMP_LTZ(3) *ROWTIME* matchRowtime, INTEGER 
price_sum)]
-+- OverAggregate(partitionBy=[symbol], orderBy=[matchRowtime ASC], window=[ 
RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[symbol, price, tax, 
matchRowtime, COUNT(price) AS w0$o0, $SUM0(price) AS w0$o1]), 
rowType=[RecordType(VARCHAR(2147483647) symbol, INTEGER price, INTEGER tax, 
TIMESTAMP_LTZ(3) *ROWTIME* matchRowtime, BIGINT w0$o0, INTEGER w0$o1)]
++- OverAggregate(partitionBy=[symbol], orderBy=[matchRowtime ASC], window=[ 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[symbol, price, tax, 
matchRowtime, COUNT(price) AS w0$o0, $SUM0(price) AS w0$o1]), 
rowType=[RecordType(VARCHAR(2147483647) symbol, INTEGER price, INTEGER tax, 
TIMESTAMP_LTZ(3) *ROWTIME* matchRowtime, BIGINT w0$o0, INTEGER w0$o1)]
    +- Exchange(distribution=[hash[symbol]]), 
rowType=[RecordType(VARCHAR(2147483647) symbol, INTEGER price, INTEGER tax, 
TIMESTAMP_LTZ(3) *ROWTIME* matchRowtime)]
       +- Match(partitionBy=[symbol], orderBy=[ts_ltz ASC], 
measures=[FINAL(A.price) AS price, FINAL(A.tax) AS tax, 
FINAL(MATCH_ROWTIME(*.ts_ltz)) AS matchRowtime], rowsPerMatch=[ONE ROW PER 
MATCH], after=[SKIP TO NEXT ROW], pattern=[_UTF-16LE'A'], 
define=[{A=>(PREV(A.$2, 0), 0)}]), rowType=[RecordType(VARCHAR(2147483647) 
symbol, INTEGER price, INTEGER tax, TIMESTAMP_LTZ(3) *ROWTIME* matchRowtime)]
          +- Exchange(distribution=[hash[symbol]]), 
rowType=[RecordType(VARCHAR(2147483647) symbol, TIMESTAMP_LTZ(3) *ROWTIME* 
ts_ltz, INTEGER price, INTEGER tax)]
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SubplanReuseTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SubplanReuseTest.xml
index e1f9644f71e..5b5622d341b 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SubplanReuseTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/SubplanReuseTest.xml
@@ -752,7 +752,7 @@ LogicalProject(a=[$0], b=[$1], EXPR$2=[$2], a0=[$3], 
b0=[$4], EXPR$20=[$5])
 Join(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, w0$o0, a0, b0, 
w0$o00], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
 :- Exchange(distribution=[hash[a]])
 :  +- Calc(select=[a, b, w0$o0], where=[(b < 100)])
-:     +- OverAggregate(orderBy=[c DESC], window=[ RANG BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW], select=[a, b, c, RANK(*) AS w0$o0])(reuse_id=[1])
+:     +- OverAggregate(orderBy=[c DESC], window=[ RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW], select=[a, b, c, RANK(*) AS w0$o0])(reuse_id=[1])
 :        +- Exchange(distribution=[single])
 :           +- TableSourceScan(table=[[default_catalog, default_database, x]], 
fields=[a, b, c])
 +- Exchange(distribution=[hash[a]])
@@ -821,7 +821,7 @@ LogicalProject(a=[$0], b=[$1], EXPR$2=[$2], a0=[$3], 
b0=[$4], EXPR$20=[$5])
 Join(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, w0$o0, a0, b0, 
w0$o00], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
 :- Exchange(distribution=[hash[a]])
 :  +- Calc(select=[a, b, w0$o0], where=[(b < 100)])
-:     +- OverAggregate(partitionBy=[c], orderBy=[c DESC], window=[ RANG 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, MyFirst(c) AS 
w0$o0])(reuse_id=[1])
+:     +- OverAggregate(partitionBy=[c], orderBy=[c DESC], window=[ RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, MyFirst(c) AS 
w0$o0])(reuse_id=[1])
 :        +- Exchange(distribution=[hash[c]])
 :           +- TableSourceScan(table=[[default_catalog, default_database, x]], 
fields=[a, b, c])
 +- Exchange(distribution=[hash[a]])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml
index 083cb10cf0e..35c13da188e 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/OverAggregateTest.xml
@@ -42,9 +42,9 @@ LogicalProject(a=[$0], b=[$1], ts=[$2], c1=[$3], c2=[$4])
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-OverAggregate(partitionBy=[a], orderBy=[ts ASC], window=[ RANG BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, ts, w0$o0, COUNT(*) AS 
w0$o0_0])
+OverAggregate(partitionBy=[a], orderBy=[ts ASC], window=[ RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, ts, w0$o0, COUNT(*) AS 
w0$o0_0])
 +- Exchange(distribution=[hash[a]])
-   +- OverAggregate(partitionBy=[a, b], orderBy=[ts ASC], window=[ RANG 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, ts, COUNT(*) AS 
w0$o0])
+   +- OverAggregate(partitionBy=[a, b], orderBy=[ts ASC], window=[ RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, ts, COUNT(*) AS 
w0$o0])
       +- Exchange(distribution=[hash[a, b]])
          +- WatermarkAssigner(rowtime=[ts], watermark=[ts])
             +- TableSourceScan(table=[[default_catalog, default_database, 
src]], fields=[a, b, ts])
@@ -127,7 +127,7 @@ LogicalProject(a=[$0], EXPR$1=[COUNT($2) OVER (ORDER BY $3 
NULLS FIRST RANGE 100
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, w0$o0 AS $1])
-+- OverAggregate(orderBy=[proctime ASC], window=[ RANG BETWEEN 10000 PRECEDING 
AND CURRENT ROW], select=[a, c, proctime, COUNT(c) AS w0$o0])
++- OverAggregate(orderBy=[proctime ASC], window=[ RANGE BETWEEN 10000 
PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(c) AS w0$o0])
    +- Exchange(distribution=[single])
       +- Calc(select=[a, c, proctime])
          +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -176,7 +176,7 @@ LogicalProject(a=[$0], avgA=[/(CASE(>(COUNT($2) OVER 
(PARTITION BY $0 ORDER BY $
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, (CASE((w0$o0 > 0), w0$o1, null:BIGINT) / w0$o0) AS avgA])
-+- OverAggregate(partitionBy=[a], orderBy=[proctime ASC], window=[ RANG 
BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(c) AS 
w0$o0, $SUM0(c) AS w0$o1])
++- OverAggregate(partitionBy=[a], orderBy=[proctime ASC], window=[ RANGE 
BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(c) AS 
w0$o0, $SUM0(c) AS w0$o1])
    +- Exchange(distribution=[hash[a]])
       +- Calc(select=[a, c, proctime])
          +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -248,7 +248,7 @@ LogicalProject(c=[$2], cnt1=[COUNT($0) OVER (ORDER BY $3 
NULLS FIRST)], cnt2=[CA
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, w0$o0 AS cnt1, CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS cnt2])
-+- OverAggregate(orderBy=[proctime ASC], window=[ RANG BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(a) AS w0$o0, $SUM0(a) 
AS w0$o1])
++- OverAggregate(orderBy=[proctime ASC], window=[ RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(a) AS w0$o0, $SUM0(a) 
AS w0$o1])
    +- Exchange(distribution=[single])
       +- Calc(select=[a, c, proctime])
          +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -296,7 +296,7 @@ LogicalProject(c=[$2], cnt1=[COUNT($0) OVER (PARTITION BY 
$2 ORDER BY $3 NULLS F
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, w0$o0 AS cnt1, CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS cnt2])
-+- OverAggregate(partitionBy=[c], orderBy=[proctime ASC], window=[ RANG 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(a) 
AS w0$o0, $SUM0(a) AS w0$o1])
++- OverAggregate(partitionBy=[c], orderBy=[proctime ASC], window=[ RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(a) 
AS w0$o0, $SUM0(a) AS w0$o1])
    +- Exchange(distribution=[hash[c]])
       +- Calc(select=[a, c, proctime])
          +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -345,7 +345,7 @@ LogicalProject(c=[$2], cnt1=[COUNT($0) OVER (ORDER BY $4 
NULLS FIRST RANGE 1000:
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, w0$o0 AS $1])
-+- OverAggregate(orderBy=[rowtime ASC], window=[ RANG BETWEEN 1000 PRECEDING 
AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0])
++- OverAggregate(orderBy=[rowtime ASC], window=[ RANGE BETWEEN 1000 PRECEDING 
AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0])
    +- Exchange(distribution=[single])
       +- Calc(select=[a, c, rowtime])
          +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -394,7 +394,7 @@ LogicalProject(c=[$2], EXPR$1=[COUNT($0) OVER (PARTITION BY 
$2 ORDER BY $4 NULLS
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, w0$o0 AS $1])
-+- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANG BETWEEN 
1000 PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0])
++- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANGE 
BETWEEN 1000 PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS 
w0$o0])
    +- Exchange(distribution=[hash[c]])
       +- Calc(select=[a, c, rowtime])
          +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -444,7 +444,7 @@ LogicalProject(c=[$2], cnt1=[COUNT($0) OVER (ORDER BY $4 
NULLS FIRST)], cnt2=[CA
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, w0$o0 AS cnt1, CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS cnt2])
-+- OverAggregate(orderBy=[rowtime ASC], window=[ RANG BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0, $SUM0(a) 
AS w0$o1])
++- OverAggregate(orderBy=[rowtime ASC], window=[ RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0, $SUM0(a) 
AS w0$o1])
    +- Exchange(distribution=[single])
       +- Calc(select=[a, c, rowtime])
          +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -494,7 +494,7 @@ LogicalProject(c=[$2], cnt1=[COUNT($0) OVER (PARTITION BY 
$2 ORDER BY $4 NULLS F
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, w0$o0 AS cnt1, CASE((w0$o0 > 0), w0$o1, null:INTEGER) AS cnt2])
-+- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANG BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0, 
$SUM0(a) AS w0$o1])
++- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) 
AS w0$o0, $SUM0(a) AS w0$o1])
    +- Exchange(distribution=[hash[c]])
       +- Calc(select=[a, c, rowtime])
          +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -593,7 +593,7 @@ LogicalProject(c=[$2], cnt1=[COUNT($0) OVER (PARTITION BY 
$2)])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, w0$o0 AS $1])
-+- OverAggregate(partitionBy=[c], orderBy=[], window=[ RANG BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING], select=[a, b, c, proctime, rowtime, 
COUNT(a) AS w0$o0])
++- OverAggregate(partitionBy=[c], orderBy=[], window=[ RANGE BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING], select=[a, b, c, proctime, rowtime, 
COUNT(a) AS w0$o0])
    +- Exchange(distribution=[hash[c]])
       +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], 
fields=[a, b, c, proctime, rowtime])
 ]]>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
index ca722ace653..b3dbc3a2ca4 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
@@ -2636,7 +2636,7 @@ Calc(select=[c, EXPR$1, EXPR$2])
 +- GroupAggregate(groupBy=[window_start, window_end, c, window_time], 
select=[window_start, window_end, c, window_time, MAX(c1) AS EXPR$1, COUNT(a) 
AS EXPR$2])
    +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
       +- Calc(select=[window_start, window_end, c, 
PROCTIME_MATERIALIZE(window_time) AS window_time, w0$o0 AS c1, a])
-         +- OverAggregate(partitionBy=[c], orderBy=[proctime DESC], window=[ 
RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, d, e, 
rowtime, proctime, window_start, window_end, window_time, COUNT(*) AS w0$o0])
+         +- OverAggregate(partitionBy=[c], orderBy=[proctime DESC], window=[ 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, d, e, 
rowtime, proctime, window_start, window_end, window_time, COUNT(*) AS w0$o0])
             +- Exchange(distribution=[hash[c]])
                +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], 
max_size=[5 min], step=[10 s])])
                   +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
@@ -2676,7 +2676,7 @@ Calc(select=[c, EXPR$1, EXPR$2])
 +- GroupAggregate(groupBy=[window_start, window_end, c, window_time], 
select=[window_start, window_end, c, window_time, MAX(c1) AS EXPR$1, COUNT(a) 
AS EXPR$2])
    +- Exchange(distribution=[hash[window_start, window_end, c, window_time]])
       +- Calc(select=[window_start, window_end, c, 
PROCTIME_MATERIALIZE(window_time) AS window_time, w0$o0 AS c1, a])
-         +- OverAggregate(partitionBy=[c], orderBy=[proctime DESC], window=[ 
RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, d, e, 
rowtime, proctime, window_start, window_end, window_time, COUNT(*) AS w0$o0])
+         +- OverAggregate(partitionBy=[c], orderBy=[proctime DESC], window=[ 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, b, c, d, e, 
rowtime, proctime, window_start, window_end, window_time, COUNT(*) AS w0$o0])
             +- Exchange(distribution=[hash[c]])
                +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], 
max_size=[5 min], step=[10 s])])
                   +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml
index 93f8ca912f5..cc42ab3cc40 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/SemiAntiJoinTest.xml
@@ -949,7 +949,7 @@ Join(joinType=[LeftSemiJoin], where=[((b = rk) AND (a <> 
d))], select=[a, b, c],
 :  +- TableSourceScan(table=[[default_catalog, default_database, l]], 
fields=[a, b, c])
 +- Exchange(distribution=[hash[rk]])
    +- Calc(select=[w0$o0 AS rk, d])
-      +- OverAggregate(partitionBy=[d], orderBy=[e ASC], window=[ RANG BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, f, RANK(*) AS w0$o0])
+      +- OverAggregate(partitionBy=[d], orderBy=[e ASC], window=[ RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, f, RANK(*) AS 
w0$o0])
          +- Exchange(distribution=[hash[d]])
             +- TableSourceScan(table=[[default_catalog, default_database, r]], 
fields=[d, e, f])
 ]]>
@@ -1488,7 +1488,7 @@ Join(joinType=[LeftSemiJoin], where=[((a = $0) AND (b = 
$1))], select=[a, b, c],
 :  +- TableSourceScan(table=[[default_catalog, default_database, l]], 
fields=[a, b, c])
 +- Exchange(distribution=[hash[$0, $1]])
    +- Calc(select=[w0$o0 AS $0, w0$o1 AS $1])
-      +- OverAggregate(partitionBy=[f], orderBy=[d ASC], window=[ RANG BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, f, MAX(d) AS w0$o0, MIN(e) 
AS w0$o1])
+      +- OverAggregate(partitionBy=[f], orderBy=[d ASC], window=[ RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[d, e, f, MAX(d) AS w0$o0, 
MIN(e) AS w0$o1])
          +- Exchange(distribution=[hash[f]])
             +- TableSourceScan(table=[[default_catalog, default_database, r]], 
fields=[d, e, f])
 ]]>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/OverAggregateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/OverAggregateTest.xml
index 1cf5b98af65..3047a06dfcd 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/OverAggregateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/OverAggregateTest.xml
@@ -26,7 +26,7 @@ LogicalProject(a=[$0], _c1=[AS(COUNT($2) OVER (ORDER BY $3 
NULLS FIRST RANGE 100
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, w0$o0 AS _c1])
-+- OverAggregate(orderBy=[proctime ASC], window=[ RANG BETWEEN 10000 PRECEDING 
AND CURRENT ROW], select=[a, c, proctime, COUNT(c) AS w0$o0])
++- OverAggregate(orderBy=[proctime ASC], window=[ RANGE BETWEEN 10000 
PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(c) AS w0$o0])
    +- Exchange(distribution=[single])
       +- Calc(select=[a, c, proctime])
          +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -60,7 +60,7 @@ LogicalProject(a=[$0], 
myAvg=[AS(*org.apache.flink.table.planner.plan.utils.Java
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, w0$o0 AS myAvg])
-+- OverAggregate(partitionBy=[a], orderBy=[proctime ASC], window=[ RANG 
BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[a, c, proctime, $3, 
*org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$WeightedAvgWithRetract*(c,
 $3) AS w0$o0])
++- OverAggregate(partitionBy=[a], orderBy=[proctime ASC], window=[ RANGE 
BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[a, c, proctime, $3, 
*org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$WeightedAvgWithRetract*(c,
 $3) AS w0$o0])
    +- Exchange(distribution=[hash[a]])
       +- Calc(select=[a, c, proctime, CAST(a AS BIGINT) AS $3])
          +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -94,7 +94,7 @@ LogicalProject(a=[$0], c=[$2], _c2=[AS(COUNT($0) OVER (ORDER 
BY $3 NULLS FIRST),
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, c, w0$o0 AS _c2, w0$o1 AS _c3])
-+- OverAggregate(orderBy=[proctime ASC], window=[ RANG BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(a) AS w0$o0, SUM(a) 
AS w0$o1])
++- OverAggregate(orderBy=[proctime ASC], window=[ RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW], select=[a, c, proctime, COUNT(a) AS w0$o0, SUM(a) 
AS w0$o1])
    +- Exchange(distribution=[single])
       +- Calc(select=[a, c, proctime])
          +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -128,7 +128,7 @@ LogicalProject(a=[$0], c=[$2], _c2=[AS(COUNT($0) OVER 
(PARTITION BY $2 ORDER BY
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, c, w0$o0 AS _c2, w0$o1 AS _c3])
-+- OverAggregate(partitionBy=[c], orderBy=[proctime ASC], window=[ RANG 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, proctime, $3, 
COUNT(a) AS w0$o0, 
*org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$WeightedAvgWithRetract*(c,
 $3) AS w0$o1])
++- OverAggregate(partitionBy=[c], orderBy=[proctime ASC], window=[ RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, proctime, $3, 
COUNT(a) AS w0$o0, 
*org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$WeightedAvgWithRetract*(c,
 $3) AS w0$o1])
    +- Exchange(distribution=[hash[c]])
       +- Calc(select=[a, c, proctime, CAST(a AS BIGINT) AS $3])
          +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -162,7 +162,7 @@ LogicalProject(c=[$2], _c1=[AS(COUNT(DISTINCT $0) OVER 
(PARTITION BY $2 ORDER BY
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, w0$o0 AS _c1, w0$o1 AS _c2, w0$o2 AS _c3])
-+- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANG BETWEEN 
7200000 PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, COUNT(DISTINCT 
a) AS w0$o0, SUM(DISTINCT a) AS w0$o1, AVG(DISTINCT $3) AS w0$o2])
++- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANGE 
BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, 
COUNT(DISTINCT a) AS w0$o0, SUM(DISTINCT a) AS w0$o1, AVG(DISTINCT $3) AS 
w0$o2])
    +- Exchange(distribution=[hash[c]])
       +- Calc(select=[a, c, rowtime, CAST(a AS FLOAT) AS $3])
          +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -196,7 +196,7 @@ LogicalProject(a=[$0], _c1=[AS(COUNT($2) OVER (ORDER BY $4 
NULLS FIRST RANGE 100
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, w0$o0 AS _c1])
-+- OverAggregate(orderBy=[rowtime ASC], window=[ RANG BETWEEN 10000 PRECEDING 
AND CURRENT ROW], select=[a, c, rowtime, COUNT(c) AS w0$o0])
++- OverAggregate(orderBy=[rowtime ASC], window=[ RANGE BETWEEN 10000 PRECEDING 
AND CURRENT ROW], select=[a, c, rowtime, COUNT(c) AS w0$o0])
    +- Exchange(distribution=[single])
       +- Calc(select=[a, c, rowtime])
          +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -213,7 +213,7 @@ LogicalProject(a=[$0], _c1=[AS(AVG($2) OVER (PARTITION BY 
$0 ORDER BY $4 NULLS F
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, w0$o0 AS _c1, w0$o1 AS wAvg])
-+- OverAggregate(partitionBy=[a], orderBy=[rowtime ASC], window=[ RANG BETWEEN 
7200000 PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, AVG(c) AS w0$o0, 
*org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$WeightedAvgWithRetract*(c,
 $3) AS w0$o1])
++- OverAggregate(partitionBy=[a], orderBy=[rowtime ASC], window=[ RANGE 
BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, AVG(c) 
AS w0$o0, 
*org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$WeightedAvgWithRetract*(c,
 $3) AS w0$o1])
    +- Exchange(distribution=[hash[a]])
       +- Calc(select=[a, c, rowtime, CAST(a AS BIGINT) AS $3])
          +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -264,7 +264,7 @@ LogicalProject(c=[$2], _c1=[AS(COUNT(DISTINCT $0) OVER 
(PARTITION BY $2 ORDER BY
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[c, w0$o0 AS _c1, w0$o1 AS _c2, w0$o2 AS _c3])
-+- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANG BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, COUNT(DISTINCT 
a) AS w0$o0, SUM(DISTINCT a) AS w0$o1, AVG(DISTINCT $3) AS w0$o2])
++- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, 
COUNT(DISTINCT a) AS w0$o0, SUM(DISTINCT a) AS w0$o1, AVG(DISTINCT $3) AS 
w0$o2])
    +- Exchange(distribution=[hash[c]])
       +- Calc(select=[a, c, rowtime, CAST(a AS FLOAT) AS $3])
          +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -298,7 +298,7 @@ LogicalProject(a=[$0], c=[$2], _c2=[AS(COUNT($0) OVER 
(ORDER BY $4 NULLS FIRST),
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, c, w0$o0 AS _c2, w0$o1 AS _c3])
-+- OverAggregate(orderBy=[rowtime ASC], window=[ RANG BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0, SUM(a) AS 
w0$o1])
++- OverAggregate(orderBy=[rowtime ASC], window=[ RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW], select=[a, c, rowtime, COUNT(a) AS w0$o0, SUM(a) AS 
w0$o1])
    +- Exchange(distribution=[single])
       +- Calc(select=[a, c, rowtime])
          +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -349,7 +349,7 @@ LogicalProject(a=[$0], c=[$2], _c2=[AS(COUNT($0) OVER 
(PARTITION BY $2 ORDER BY
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, c, w0$o0 AS _c2, w0$o1 AS wAvg])
-+- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANG BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, COUNT(a) AS 
w0$o0, 
*org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$WeightedAvgWithRetract*(c,
 $3) AS w0$o1])
++- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, 
COUNT(a) AS w0$o0, 
*org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions$WeightedAvgWithRetract*(c,
 $3) AS w0$o1])
    +- Exchange(distribution=[hash[c]])
       +- Calc(select=[a, c, rowtime, CAST(a AS BIGINT) AS $3])
          +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/PythonOverWindowAggregateTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/PythonOverWindowAggregateTest.xml
index 296dd7cbbf8..2126bcbb8f2 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/PythonOverWindowAggregateTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/PythonOverWindowAggregateTest.xml
@@ -26,7 +26,7 @@ LogicalProject(b=[$1], 
_c1=[AS(*org.apache.flink.table.planner.runtime.utils.Jav
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[b, w0$o0 AS _c1])
-+- PythonOverAggregate(partitionBy=[b], orderBy=[$3 ASC], window=[ RANG 
BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[a, b, c, $3, 
PandasAggregateFunction(a, c) AS w0$o0])
++- PythonOverAggregate(partitionBy=[b], orderBy=[$3 ASC], window=[ RANGE 
BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[a, b, c, $3, 
PandasAggregateFunction(a, c) AS w0$o0])
    +- Exchange(distribution=[hash[b]])
       +- Calc(select=[a, b, c, PROCTIME() AS $3])
          +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c])
@@ -61,7 +61,7 @@ LogicalProject(b=[$1], 
_c1=[AS(*org.apache.flink.table.planner.runtime.utils.Jav
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[b, w0$o0 AS _c1])
-+- PythonOverAggregate(partitionBy=[b], orderBy=[rowtime ASC], window=[ RANG 
BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[a, b, c, rowtime, 
PandasAggregateFunction(a, c) AS w0$o0])
++- PythonOverAggregate(partitionBy=[b], orderBy=[rowtime ASC], window=[ RANGE 
BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[a, b, c, rowtime, 
PandasAggregateFunction(a, c) AS w0$o0])
    +- Exchange(distribution=[hash[b]])
       +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])
          +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, rowtime])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml
index 7695b8010c3..b85eb7d846d 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/table/TableSourceTest.xml
@@ -41,7 +41,7 @@ LogicalFilter(condition=[>($2, 100)])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[id, name, w0$o0 AS valSum], where=[(w0$o0 > 100)])
-+- OverAggregate(partitionBy=[id], orderBy=[$3 ASC], window=[ RANG BETWEEN 
7200000 PRECEDING AND CURRENT ROW], select=[id, val, name, $3, SUM(val) AS 
w0$o0])
++- OverAggregate(partitionBy=[id], orderBy=[$3 ASC], window=[ RANGE BETWEEN 
7200000 PRECEDING AND CURRENT ROW], select=[id, val, name, $3, SUM(val) AS 
w0$o0])
    +- Exchange(distribution=[hash[id]])
       +- Calc(select=[id, val, name, PROCTIME() AS $3])
          +- TableSourceScan(table=[[default_catalog, default_database, 
procTimeT]], fields=[id, val, name])
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-over-aggregate_1/over-aggregate-unbounded-partitioned-rows/plan/over-aggregate-unbounded-partitioned-rows.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-over-aggregate_1/over-aggregate-unbounded-partitioned-rows/plan/over-aggregate-unbounded-partitioned-rows.json
index 1cc54126226..a0e738c5c70 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-over-aggregate_1/over-aggregate-unbounded-partitioned-rows/plan/over-aggregate-unbounded-partitioned-rows.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/batch-exec-over-aggregate_1/over-aggregate-unbounded-partitioned-rows/plan/over-aggregate-unbounded-partitioned-rows.json
@@ -257,7 +257,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`ts` BIGINT, `a` BIGINT, `b` INT, `c` 
VARCHAR(2147483647), `rowtime` TIMESTAMP(3), `$5` BIGINT NOT NULL, `w0$o0` 
BIGINT, `w0$o1` BIGINT NOT NULL, `w0$o2` BIGINT NOT NULL>",
-    "description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], 
window#0=[LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2 RANG 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[ts, a, b, c, rowtime, $5, 
w0$o0, w0$o1, w0$o2])"
+    "description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], 
window#0=[LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2 RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[ts, a, b, c, rowtime, $5, 
w0$o0, w0$o1, w0$o2])"
   }, {
     "id" : 6,
     "type" : "batch-exec-calc_1",
@@ -431,4 +431,4 @@
     },
     "shuffleMode" : "PIPELINED"
   } ]
-}
\ No newline at end of file
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-non-partitioned-rows-with-out-of-order-records/plan/over-aggregate-bounded-non-partitioned-rows-with-out-of-order-records.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-non-partitioned-rows-with-out-of-order-records/plan/over-aggregate-bounded-non-partitioned-rows-with-out-of-order-rec
 [...]
index 4e73f861c70..7e61a62008f 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-non-partitioned-rows-with-out-of-order-records/plan/over-aggregate-bounded-non-partitioned-rows-with-out-of-order-records.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-non-partitioned-rows-with-out-of-order-records/plan/over-aggregate-bounded-non-partitioned-rows-with-out-of-order-records.json
@@ -357,7 +357,7 @@
         "fieldType" : "BIGINT NOT NULL"
       } ]
     },
-    "description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], 
window=[ RANG BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[ts, a, b, c, 
rowtime, $5, LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])"
+    "description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], 
window=[ RANGE BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[ts, a, b, c, 
rowtime, $5, LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])"
   }, {
     "id" : 15,
     "type" : "stream-exec-calc_1",
@@ -588,4 +588,4 @@
     },
     "shuffleMode" : "PIPELINED"
   } ]
-}
\ No newline at end of file
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-non-partitioned-rows/plan/over-aggregate-bounded-non-partitioned-rows.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-non-partitioned-rows/plan/over-aggregate-bounded-non-partitioned-rows.json
index 2afb5fe7bb9..ac8365bb10d 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-non-partitioned-rows/plan/over-aggregate-bounded-non-partitioned-rows.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-non-partitioned-rows/plan/over-aggregate-bounded-non-partitioned-rows.json
@@ -356,7 +356,7 @@
         "fieldType" : "BIGINT NOT NULL"
       } ]
     },
-    "description" : "OverAggregate(orderBy=[rowtime ASC], window=[ RANG 
BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[ts, a, b, c, rowtime, $5, 
LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])"
+    "description" : "OverAggregate(orderBy=[rowtime ASC], window=[ RANGE 
BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[ts, a, b, c, rowtime, $5, 
LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])"
   }, {
     "id" : 15,
     "type" : "stream-exec-calc_1",
@@ -587,4 +587,4 @@
     },
     "shuffleMode" : "PIPELINED"
   } ]
-}
\ No newline at end of file
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-partitioned-rows-with-out-of-order-records/plan/over-aggregate-bounded-partitioned-rows-with-out-of-order-records.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-partitioned-rows-with-out-of-order-records/plan/over-aggregate-bounded-partitioned-rows-with-out-of-order-records.json
index 8aad7619213..8905d1553ae 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-partitioned-rows-with-out-of-order-records/plan/over-aggregate-bounded-partitioned-rows-with-out-of-order-records.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-partitioned-rows-with-out-of-order-records/plan/over-aggregate-bounded-partitioned-rows-with-out-of-order-records.json
@@ -357,7 +357,7 @@
         "fieldType" : "BIGINT NOT NULL"
       } ]
     },
-    "description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], 
window=[ RANG BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[ts, a, b, c, 
rowtime, $5, LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])",
+    "description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], 
window=[ RANGE BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[ts, a, b, c, 
rowtime, $5, LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])",
     "unboundedOverVersion" : 1
   }, {
     "id" : 7,
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-partitioned-rows/plan/over-aggregate-bounded-partitioned-rows.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-partitioned-rows/plan/over-aggregate-bounded-partitioned-rows.json
index fa7b5b76970..55283e3a9ce 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-partitioned-rows/plan/over-aggregate-bounded-partitioned-rows.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-bounded-partitioned-rows/plan/over-aggregate-bounded-partitioned-rows.json
@@ -357,7 +357,7 @@
         "fieldType" : "BIGINT NOT NULL"
       } ]
     },
-    "description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], 
window=[ RANG BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[ts, a, b, c, 
rowtime, $5, LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])",
+    "description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], 
window=[ RANGE BETWEEN 10000 PRECEDING AND CURRENT ROW], select=[ts, a, b, c, 
rowtime, $5, LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])",
     "unboundedOverVersion" : 1
   }, {
     "id" : 7,
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/plan/over-aggregate-lag.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/plan/over-aggregate-lag.json
index c09e4e1171c..0281b92a15b 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/plan/over-aggregate-lag.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-lag/plan/over-aggregate-lag.json
@@ -210,7 +210,7 @@
         "fieldType" : "MAP<DOUBLE, DOUBLE>"
       } ]
     },
-    "description" : "OverAggregate(orderBy=[r_time ASC], window=[ RANG BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW], select=[ts, b, r_time, LAG(b, 1) AS 
w0$o0])"
+    "description" : "OverAggregate(orderBy=[r_time ASC], window=[ RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[ts, b, r_time, LAG(b, 1) 
AS w0$o0])"
   }, {
     "id" : 6,
     "type" : "stream-exec-calc_1",
@@ -315,4 +315,4 @@
     },
     "shuffleMode" : "PIPELINED"
   } ]
-}
\ No newline at end of file
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-avg-append-mode/plan/over-aggregate-non-time-range-unbounded-avg-append-mode.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-avg-append-mode/plan/over-aggregate-non-time-range-unbounded-avg-append-mode.json
index 2ce44ad537f..e2fe499f6a6 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-avg-append-mode/plan/over-aggregate-non-time-range-unbounded-avg-append-mode.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-avg-append-mode/plan/over-aggregate-non-time-range-unbounded-avg-append-mode.json
@@ -94,7 +94,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT, 
`w0$o0` BIGINT NOT NULL, `w0$o1` BIGINT NOT NULL>",
-    "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], 
window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, 
ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])",
+    "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], 
window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, 
ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])",
     "unboundedOverVersion" : 2
   }, {
     "id" : 21,
@@ -234,4 +234,4 @@
     },
     "shuffleMode" : "PIPELINED"
   } ]
-}
\ No newline at end of file
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-multiple-aggs-append-mode/plan/over-aggregate-non-time-range-unbounded-multiple-aggs-append-mode.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-multiple-aggs-append-mode/plan/over-aggregate-non-time-range-unbounded-multiple-aggs-append-mode.json
index 1d3547ff339..ec700f0bf28 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-multiple-aggs-append-mode/plan/over-aggregate-non-time-range-unbounded-multiple-aggs-append-mode.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-multiple-aggs-append-mode/plan/over-aggregate-non-time-range-unbounded-multiple-aggs-append-mode.json
@@ -104,7 +104,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT, 
`w0$o0` BIGINT NOT NULL, `w0$o1` BIGINT NOT NULL, `w0$o2` BIGINT NOT NULL>",
-    "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], 
window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, 
ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1, COUNT(key) AS w0$o2])",
+    "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], 
window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, 
ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1, COUNT(key) AS w0$o2])",
     "unboundedOverVersion" : 2
   }, {
     "id" : 26,
@@ -241,4 +241,4 @@
     },
     "shuffleMode" : "PIPELINED"
   } ]
-}
\ No newline at end of file
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-append-mode/plan/over-aggregate-non-time-range-unbounded-sum-append-mode.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-append-mode/plan/over-aggregate-non-time-range-unbounded-sum-append-mode.json
index 9d0d09ed79c..1c73cdfc3ce 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-append-mode/plan/over-aggregate-non-time-range-unbounded-sum-append-mode.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-append-mode/plan/over-aggregate-non-time-range-unbounded-sum-append-mode.json
@@ -94,7 +94,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT, 
`w0$o0` BIGINT NOT NULL, `w0$o1` BIGINT NOT NULL>",
-    "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], 
window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, 
ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])",
+    "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], 
window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, 
ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])",
     "unboundedOverVersion" : 2
   }, {
     "id" : 16,
@@ -224,4 +224,4 @@
     },
     "shuffleMode" : "PIPELINED"
   } ]
-}
\ No newline at end of file
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-no-partition-by/plan/over-aggregate-non-time-range-unbounded-sum-no-partition-by.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-no-partition-by/plan/over-aggregate-non-time-range-unbounded-sum-no-partition-by.json
index 18e9ff775a3..e9a5e181521 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-no-partition-by/plan/over-aggregate-non-time-range-unbounded-sum-no-partition-by.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-no-partition-by/plan/over-aggregate-non-time-range-unbounded-sum-no-partition-by.json
@@ -93,7 +93,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT, 
`w0$o0` BIGINT NOT NULL, `w0$o1` BIGINT NOT NULL>",
-    "description" : "OverAggregate(orderBy=[val ASC], window=[ RANG BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, ts, COUNT(val) AS 
w0$o0, $SUM0(val) AS w0$o1])",
+    "description" : "OverAggregate(orderBy=[val ASC], window=[ RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, ts, COUNT(val) AS 
w0$o0, $SUM0(val) AS w0$o1])",
     "unboundedOverVersion" : 2
   }, {
     "id" : 31,
@@ -223,4 +223,4 @@
     },
     "shuffleMode" : "PIPELINED"
   } ]
-}
\ No newline at end of file
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-sink-primary-key/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-sink-primary-key.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-sink-primary-key/plan/over-aggregate-non-time-range-unbounded-sum-retract-m
 [...]
index 09218eb7fc0..11c0c495d16 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-sink-primary-key/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-sink-primary-key.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-sink-primary-key/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-sink-primary-key.json
@@ -94,7 +94,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT, 
`w0$o0` BIGINT NOT NULL, `w0$o1` BIGINT NOT NULL>",
-    "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], 
window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, 
ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])",
+    "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], 
window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, 
ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])",
     "unboundedOverVersion" : 2
   }, {
     "id" : 41,
@@ -235,4 +235,4 @@
     },
     "shuffleMode" : "PIPELINED"
   } ]
-}
\ No newline at end of file
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-sort-by-key/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-sort-by-key.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-sort-by-key/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-sort-by-key.json
index 7e444e9bb37..056f57aaa63 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-sort-by-key/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-sort-by-key.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-sort-by-key/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-sort-by-key.json
@@ -91,7 +91,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT, 
`w0$o0` BIGINT NOT NULL, `w0$o1` BIGINT NOT NULL>",
-    "description" : "OverAggregate(partitionBy=[val], orderBy=[key ASC], 
window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, 
ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])",
+    "description" : "OverAggregate(partitionBy=[val], orderBy=[key ASC], 
window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, 
ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])",
     "unboundedOverVersion" : 2
   }, {
     "id" : 4,
@@ -219,4 +219,4 @@
     },
     "shuffleMode" : "PIPELINED"
   } ]
-}
\ No newline at end of file
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-primary-key/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-primary-key.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-primary-key/plan/over-aggregate-non-time-range-unbounded-sum-ret
 [...]
index 4a8e2b5292d..1173ff5983b 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-primary-key/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-primary-key.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-primary-key/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-primary-key.json
@@ -99,7 +99,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`key` VARCHAR(2147483647) NOT NULL, `val` BIGINT, `ts` 
BIGINT, `w0$o0` BIGINT NOT NULL, `w0$o1` BIGINT NOT NULL>",
-    "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], 
window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, 
ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])",
+    "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], 
window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, 
ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])",
     "unboundedOverVersion" : 2
   }, {
     "id" : 36,
@@ -230,4 +230,4 @@
     },
     "shuffleMode" : "PIPELINED"
   } ]
-}
\ No newline at end of file
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-sink-primary-key-partition-by-non-pk/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-sink-primary-key-partition-by-non-pk.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-sink-primary-k
 [...]
index 2bf686cb9cf..729736c24cf 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-sink-primary-key-partition-by-non-pk/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-sink-primary-key-partition-by-non-pk.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-sink-primary-key-partition-by-non-pk/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-sink-primary-key-partition-by-non-pk.json
@@ -99,7 +99,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT NOT NULL, `ts` 
BIGINT, `w0$o0` BIGINT NOT NULL, `w0$o1` BIGINT NOT NULL>",
-    "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], 
window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, 
ts, COUNT(ts) AS w0$o0, $SUM0(ts) AS w0$o1])",
+    "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], 
window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, 
ts, COUNT(ts) AS w0$o0, $SUM0(ts) AS w0$o1])",
     "unboundedOverVersion" : 2
   }, {
     "id" : 54,
@@ -240,4 +240,4 @@
     },
     "shuffleMode" : "PIPELINED"
   } ]
-}
\ No newline at end of file
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-sink-primary-key/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-sink-primary-key.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-sink-primary-key/plan/over-aggregate-non-time-range-un
 [...]
index 4830f8e51fb..03b7c08e214 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-sink-primary-key/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-sink-primary-key.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-sink-primary-key/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode-source-sink-primary-key.json
@@ -147,7 +147,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`key` VARCHAR(2147483647) NOT NULL, `val` BIGINT, `ts` 
BIGINT, `w0$o0` BIGINT NOT NULL, `w0$o1` BIGINT NOT NULL>",
-    "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], 
window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, 
ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])",
+    "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], 
window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, 
ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])",
     "unboundedOverVersion" : 2
   }, {
     "id" : 49,
@@ -304,4 +304,4 @@
     },
     "shuffleMode" : "PIPELINED"
   } ]
-}
\ No newline at end of file
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode.json
index b7de4fb3f6d..b943cd06f9d 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-non-time-range-unbounded-sum-retract-mode/plan/over-aggregate-non-time-range-unbounded-sum-retract-mode.json
@@ -94,7 +94,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`key` VARCHAR(2147483647), `val` BIGINT, `ts` BIGINT, 
`w0$o0` BIGINT NOT NULL, `w0$o1` BIGINT NOT NULL>",
-    "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], 
window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, 
ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])",
+    "description" : "OverAggregate(partitionBy=[key], orderBy=[val ASC], 
window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[key, val, 
ts, COUNT(val) AS w0$o0, $SUM0(val) AS w0$o1])",
     "unboundedOverVersion" : 2
   }, {
     "id" : 11,
@@ -224,4 +224,4 @@
     },
     "shuffleMode" : "PIPELINED"
   } ]
-}
\ No newline at end of file
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-unbounded-partitioned-rows-with-out-of-order-records/plan/over-aggregate-unbounded-partitioned-rows-with-out-of-order-records.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-unbounded-partitioned-rows-with-out-of-order-records/plan/over-aggregate-unbounded-partitioned-rows-with-out-of-order-records.json
index 4ea0de87baf..c59f0058d77 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-unbounded-partitioned-rows-with-out-of-order-records/plan/over-aggregate-unbounded-partitioned-rows-with-out-of-order-records.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-unbounded-partitioned-rows-with-out-of-order-records/plan/over-aggregate-unbounded-partitioned-rows-with-out-of-order-records.json
@@ -347,7 +347,7 @@
         "fieldType" : "BIGINT NOT NULL"
       } ]
     },
-    "description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], 
window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[ts, a, b, 
c, rowtime, $5, LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])"
+    "description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], 
window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[ts, a, b, 
c, rowtime, $5, LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])"
   }, {
     "id" : 23,
     "type" : "stream-exec-calc_1",
@@ -578,4 +578,4 @@
     },
     "shuffleMode" : "PIPELINED"
   } ]
-}
\ No newline at end of file
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-unbounded-partitioned-rows/plan/over-aggregate-unbounded-partitioned-rows.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-unbounded-partitioned-rows/plan/over-aggregate-unbounded-partitioned-rows.json
index 6efa3a92257..fd860cd4594 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-unbounded-partitioned-rows/plan/over-aggregate-unbounded-partitioned-rows.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-over-aggregate_1/over-aggregate-unbounded-partitioned-rows/plan/over-aggregate-unbounded-partitioned-rows.json
@@ -347,7 +347,7 @@
         "fieldType" : "BIGINT NOT NULL"
       } ]
     },
-    "description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], 
window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[ts, a, b, 
c, rowtime, $5, LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])",
+    "description" : "OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], 
window=[ RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[ts, a, b, 
c, rowtime, $5, LTCNT(a, $5) AS w0$o0, COUNT(a) AS w0$o1, $SUM0(a) AS w0$o2])",
     "unboundedOverVersion" : 1
   }, {
     "id" : 7,

Reply via email to