This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 333885a5014bdd09bfe8ea6decfcd2c785c7603c
Author: yuzhao.cyz <[email protected]>
AuthorDate: Tue Mar 17 21:22:30 2020 +0800

    [FLINK-14338][table-planner][table-planner-blink] Update files due to 
builtin TUMBLE operator name changes to $Tumble
    
    * This change was introduced in CALCITE-3382
---
 .../functions/sql/FlinkSqlOperatorTable.java       |   3 +-
 .../apache/flink/table/api/stream/ExplainTest.xml  |   4 +-
 .../planner/plan/batch/sql/DagOptimizationTest.xml |   4 +-
 .../batch/sql/agg/AggregateReduceGroupingTest.xml  |  38 +-
 .../plan/batch/sql/agg/WindowAggregateTest.xml     | 452 ++++++++++-----------
 .../logical/AggregateReduceGroupingRuleTest.xml    |  42 +-
 .../plan/stream/sql/MiniBatchIntervalInferTest.xml |  16 +-
 .../stream/sql/RelTimeIndicatorConverterTest.xml   |  12 +-
 .../planner/plan/stream/sql/TableSourceTest.xml    |   2 +-
 .../plan/stream/sql/agg/WindowAggregateTest.xml    | 216 ++++------
 .../flink/table/catalog/BasicOperatorTable.scala   |   2 +
 11 files changed, 376 insertions(+), 415 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index 6cae5de..7ce4bd4 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -776,7 +776,8 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
         * We need custom group auxiliary functions in order to support nested 
windows.
         */
        public static final SqlGroupedWindowFunction TUMBLE = new 
SqlGroupedWindowFunction(
-                       SqlKind.TUMBLE, null,
+                       // The TUMBLE group function was hard code to $TUMBLE 
in CALCITE-3382.
+                       "$TUMBLE", SqlKind.TUMBLE, null,
                        OperandTypes.or(OperandTypes.DATETIME_INTERVAL, 
OperandTypes.DATETIME_INTERVAL_TIME)) {
                @Override
                public List<SqlGroupedWindowFunction> getAuxiliaryFunctions() {
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
index b90451f..7bdd8b8 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/api/stream/ExplainTest.xml
@@ -638,7 +638,7 @@ SortLimit(orderBy=[a ASC], offset=[0], fetch=[5], 
updateAsRetraction=[false], ac
 LogicalSink(name=[`default_catalog`.`default_database`.`appendSink1`], 
fields=[a, b])
 +- LogicalProject(id1=[$0], EXPR$1=[$2])
    +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
-      +- LogicalProject(id1=[$0], $f1=[TUMBLE($2, 8000:INTERVAL SECOND)], 
text=[$1], $f3=[_UTF-16LE'#'])
+      +- LogicalProject(id1=[$0], $f1=[$TUMBLE($2, 8000:INTERVAL SECOND)], 
text=[$1], $f3=[_UTF-16LE'#'])
          +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 
300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))])
             +- LogicalJoin(condition=[true], joinType=[inner])
                :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($2, 
0:INTERVAL MILLISECOND)])
@@ -778,7 +778,7 @@ Union(all=[true], union=[a, b, c])
 LogicalSink(name=[`default_catalog`.`default_database`.`appendSink1`], 
fields=[a, b])
 +- LogicalProject(id1=[$0], EXPR$1=[$2])
    +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
-      +- LogicalProject(id1=[$0], $f1=[TUMBLE($2, 8000:INTERVAL SECOND)], 
text=[$1], $f3=[_UTF-16LE'#'])
+      +- LogicalProject(id1=[$0], $f1=[$TUMBLE($2, 8000:INTERVAL SECOND)], 
text=[$1], $f3=[_UTF-16LE'#'])
          +- LogicalFilter(condition=[AND(=($0, $3), >($2, -($7, 
300000:INTERVAL MINUTE)), <($2, +($7, 180000:INTERVAL MINUTE)))])
             +- LogicalJoin(condition=[true], joinType=[inner])
                :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($2, 
0:INTERVAL MILLISECOND)])
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.xml
index 7fe88c8..f48686c 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.xml
@@ -570,13 +570,13 @@ Sink(name=[`default_catalog`.`default_database`.`sink2`], 
fields=[total_min])
 LogicalSink(name=[`default_catalog`.`default_database`.`sink1`], fields=[a, 
sum_c, time, window_start, window_end])
 +- LogicalProject(a=[$1], sum_c=[$2], time=[TUMBLE_END($0)], 
window_start=[TUMBLE_START($0)], window_end=[TUMBLE_END($0)])
    +- LogicalAggregate(group=[{0, 1}], sum_c=[SUM($2)])
-      +- LogicalProject($f0=[TUMBLE($3, 15000:INTERVAL SECOND)], a=[$0], 
$f2=[CAST($2):DOUBLE])
+      +- LogicalProject($f0=[$TUMBLE($3, 15000:INTERVAL SECOND)], a=[$0], 
$f2=[CAST($2):DOUBLE])
          +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, ts)]]])
 
 LogicalSink(name=[`default_catalog`.`default_database`.`sink2`], fields=[a, 
sum_c, time])
 +- LogicalProject(a=[$1], sum_c=[$2], time=[TUMBLE_END($0)])
    +- LogicalAggregate(group=[{0, 1}], sum_c=[SUM($2)])
-      +- LogicalProject($f0=[TUMBLE($3, 15000:INTERVAL SECOND)], a=[$0], 
$f2=[CAST($2):DOUBLE])
+      +- LogicalProject($f0=[$TUMBLE($3, 15000:INTERVAL SECOND)], a=[$0], 
$f2=[CAST($2):DOUBLE])
          +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, ts)]]])
 ]]>
     </Resource>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml
index aa9e8cc..3dab6fa 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/AggregateReduceGroupingTest.xml
@@ -359,7 +359,7 @@ HashAggregate(isMerge=[true], groupBy=[a3, b3, a1], 
auxGrouping=[b1], select=[a3
       <![CDATA[
 LogicalProject(a4=[$0], b4=[$1], EXPR$2=[$3])
 +- LogicalAggregate(group=[{0, 1, 2}], EXPR$2=[COUNT($3)])
-   +- LogicalProject(a4=[$0], b4=[$1], $f2=[TUMBLE($3, 900000:INTERVAL 
MINUTE)], c4=[$2])
+   +- LogicalProject(a4=[$0], b4=[$1], $f2=[$TUMBLE($3, 900000:INTERVAL 
MINUTE)], c4=[$2])
       +- LogicalTableScan(table=[[default_catalog, default_database, T4, 
source: [TestTableSource(a4, b4, c4, d4)]]])
 ]]>
     </Resource>
@@ -379,7 +379,7 @@ HashWindowAggregate(groupBy=[a4], auxGrouping=[b4], 
window=[TumblingGroupWindow(
       <![CDATA[
 LogicalProject(a4=[$0], c4=[$1], EXPR$2=[$3], EXPR$3=[$4])
 +- LogicalAggregate(group=[{0, 1, 2}], EXPR$2=[COUNT($3)], EXPR$3=[AVG($3)])
-   +- LogicalProject(a4=[$0], c4=[$2], $f2=[TUMBLE($3, 900000:INTERVAL 
MINUTE)], b4=[$1])
+   +- LogicalProject(a4=[$0], c4=[$2], $f2=[$TUMBLE($3, 900000:INTERVAL 
MINUTE)], b4=[$1])
       +- LogicalTableScan(table=[[default_catalog, default_database, T4, 
source: [TestTableSource(a4, b4, c4, d4)]]])
 ]]>
     </Resource>
@@ -400,7 +400,7 @@ HashWindowAggregate(groupBy=[a4], auxGrouping=[c4], 
window=[TumblingGroupWindow(
 LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT($3)])
 +- LogicalProject(a4=[$0], c4=[$1], s=[TUMBLE_START($2)], b4=[$3])
    +- LogicalAggregate(group=[{0, 1, 2}], b4=[VAR_POP($3)])
-      +- LogicalProject(a4=[$0], c4=[$2], $f2=[TUMBLE($3, 900000:INTERVAL 
MINUTE)], b4=[$1])
+      +- LogicalProject(a4=[$0], c4=[$2], $f2=[$TUMBLE($3, 900000:INTERVAL 
MINUTE)], b4=[$1])
          +- LogicalTableScan(table=[[default_catalog, default_database, T4, 
source: [TestTableSource(a4, b4, c4, d4)]]])
 ]]>
     </Resource>
@@ -427,7 +427,7 @@ Calc(select=[a4, c4, s, EXPR$3])
 LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT($3)])
 +- LogicalProject(a4=[$0], c4=[$1], e=[TUMBLE_END($2)], b4=[$3])
    +- LogicalAggregate(group=[{0, 1, 2}], b4=[VAR_POP($3)])
-      +- LogicalProject(a4=[$0], c4=[$2], $f2=[TUMBLE($3, 900000:INTERVAL 
MINUTE)], b4=[$1])
+      +- LogicalProject(a4=[$0], c4=[$2], $f2=[$TUMBLE($3, 900000:INTERVAL 
MINUTE)], b4=[$1])
          +- LogicalTableScan(table=[[default_catalog, default_database, T4, 
source: [TestTableSource(a4, b4, c4, d4)]]])
 ]]>
     </Resource>
@@ -454,7 +454,7 @@ Calc(select=[a4, c4, e, EXPR$3])
 LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
 +- LogicalProject(a4=[$0], b4=[$3], c4=[$1])
    +- LogicalAggregate(group=[{0, 1, 2}], b4=[VAR_POP($3)])
-      +- LogicalProject(a4=[$0], c4=[$2], $f2=[TUMBLE($3, 900000:INTERVAL 
MINUTE)], b4=[$1])
+      +- LogicalProject(a4=[$0], c4=[$2], $f2=[$TUMBLE($3, 900000:INTERVAL 
MINUTE)], b4=[$1])
          +- LogicalTableScan(table=[[default_catalog, default_database, T4, 
source: [TestTableSource(a4, b4, c4, d4)]]])
 ]]>
     </Resource>
@@ -487,7 +487,7 @@ Calc(select=[a1, b1, c1, EXPR$3])
 +- HashAggregate(isMerge=[true], groupBy=[a1, b1, c1, $e], select=[a1, b1, c1, 
$e, Final_COUNT(count$0) AS EXPR$3])
    +- Exchange(distribution=[hash[a1, b1, c1, $e]])
       +- LocalHashAggregate(groupBy=[a1, b1, c1, $e], select=[a1, b1, c1, $e, 
Partial_COUNT(d1) AS count$0])
-         +- Expand(projects=[{a1=[$0], b1=[$1], c1=[$2], d1=[$3], $e=[0]}, 
{a1=[$0], b1=[$1], c1=[null], d1=[$3], $e=[1]}, {a1=[$0], b1=[null], c1=[$2], 
d1=[$3], $e=[2]}, {a1=[$0], b1=[null], c1=[null], d1=[$3], $e=[3]}, {a1=[null], 
b1=[$1], c1=[$2], d1=[$3], $e=[4]}, {a1=[null], b1=[$1], c1=[null], d1=[$3], 
$e=[5]}, {a1=[null], b1=[null], c1=[$2], d1=[$3], $e=[6]}, {a1=[null], 
b1=[null], c1=[null], d1=[$3], $e=[7]}], projects=[{a1, b1, c1, d1, 0 AS $e}, 
{a1, b1, null AS c1, d1, 1 AS $e} [...]
+         +- Expand(projects=[a1, b1, c1, d1, $e], projects=[{a1, b1, c1, d1, 0 
AS $e}, {a1, b1, null AS c1, d1, 1 AS $e}, {a1, null AS b1, c1, d1, 2 AS $e}, 
{a1, null AS b1, null AS c1, d1, 3 AS $e}, {null AS a1, b1, c1, d1, 4 AS $e}, 
{null AS a1, b1, null AS c1, d1, 5 AS $e}, {null AS a1, null AS b1, c1, d1, 6 
AS $e}, {null AS a1, null AS b1, null AS c1, d1, 7 AS $e}])
             +- TableSourceScan(table=[[default_catalog, default_database, T1, 
source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1])
 ]]>
     </Resource>
@@ -507,7 +507,7 @@ LogicalAggregate(group=[{0, 1, 2}], groups=[[{0, 1}, {0, 
2}]], EXPR$3=[COUNT($3)
 Calc(select=[a1, b1, c1, EXPR$3])
 +- HashAggregate(isMerge=[false], groupBy=[a1, b1, c1, $e], select=[a1, b1, 
c1, $e, COUNT(d1) AS EXPR$3])
    +- Exchange(distribution=[hash[a1, b1, c1, $e]])
-      +- Expand(projects=[{a1=[$0], b1=[$1], c1=[null], d1=[$3], $e=[1]}, 
{a1=[$0], b1=[null], c1=[$2], d1=[$3], $e=[2]}], projects=[{a1, b1, null AS c1, 
d1, 1 AS $e}, {a1, null AS b1, c1, d1, 2 AS $e}])
+      +- Expand(projects=[a1, b1, c1, d1, $e], projects=[{a1, b1, null AS c1, 
d1, 1 AS $e}, {a1, null AS b1, c1, d1, 2 AS $e}])
          +- TableSourceScan(table=[[default_catalog, default_database, T1, 
source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1])
 ]]>
     </Resource>
@@ -530,7 +530,7 @@ Calc(select=[a1, s])
 +- HashAggregate(isMerge=[true], groupBy=[a1, c1, $e], select=[a1, c1, $e, 
Final_SUM(sum$0) AS s])
    +- Exchange(distribution=[hash[a1, c1, $e]])
       +- LocalHashAggregate(groupBy=[a1, c1, $e], select=[a1, c1, $e, 
Partial_SUM(b1) AS sum$0])
-         +- Expand(projects=[{a1=[$0], c1=[$1], b1=[$2], $e=[0]}, {a1=[$0], 
c1=[null], b1=[$2], $e=[1]}, {a1=[null], c1=[null], b1=[$2], $e=[3]}], 
projects=[{a1, c1, b1, 0 AS $e}, {a1, null AS c1, b1, 1 AS $e}, {null AS a1, 
null AS c1, b1, 3 AS $e}])
+         +- Expand(projects=[a1, c1, b1, $e], projects=[{a1, c1, b1, 0 AS $e}, 
{a1, null AS c1, b1, 1 AS $e}, {null AS a1, null AS c1, b1, 3 AS $e}])
             +- Calc(select=[a1, c1, b1])
                +- TableSourceScan(table=[[default_catalog, default_database, 
T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1])
 ]]>
@@ -552,7 +552,7 @@ LogicalProject(a1=[$0], b1=[$1], c1=[$2], EXPR$3=[$4])
 Calc(select=[a1, b1, c1, EXPR$3])
 +- HashAggregate(isMerge=[false], groupBy=[a1, c1, d1, $e], auxGrouping=[b1], 
select=[a1, c1, d1, $e, b1, COUNT(d1_0) AS EXPR$3])
    +- Exchange(distribution=[hash[a1, c1, d1, $e]])
-      +- Expand(projects=[{a1=[$0], b1=[$1], c1=[$2], d1=[null], $e=[1], 
d1_0=[$3]}, {a1=[$0], b1=[$1], c1=[null], d1=[$3], $e=[2], d1_0=[$3]}], 
projects=[{a1, b1, c1, null AS d1, 1 AS $e, d1 AS d1_0}, {a1, b1, null AS c1, 
d1, 2 AS $e, d1 AS d1_0}])
+      +- Expand(projects=[a1, b1, c1, d1, $e, d1_0], projects=[{a1, b1, c1, 
null AS d1, 1 AS $e, d1 AS d1_0}, {a1, b1, null AS c1, d1, 2 AS $e, d1 AS 
d1_0}])
          +- TableSourceScan(table=[[default_catalog, default_database, T1, 
source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1])
 ]]>
     </Resource>
@@ -614,7 +614,7 @@ Calc(select=[a1, b1, c1, EXPR$3])
 +- HashAggregate(isMerge=[true], groupBy=[a1, b1, c1, $e], select=[a1, b1, c1, 
$e, Final_COUNT(count$0) AS EXPR$3])
    +- Exchange(distribution=[hash[a1, b1, c1, $e]])
       +- LocalHashAggregate(groupBy=[a1, b1, c1, $e], select=[a1, b1, c1, $e, 
Partial_COUNT(d1) AS count$0])
-         +- Expand(projects=[{a1=[$0], b1=[$1], c1=[$2], d1=[$3], $e=[0]}, 
{a1=[$0], b1=[$1], c1=[null], d1=[$3], $e=[1]}, {a1=[$0], b1=[null], c1=[null], 
d1=[$3], $e=[3]}, {a1=[null], b1=[null], c1=[null], d1=[$3], $e=[7]}], 
projects=[{a1, b1, c1, d1, 0 AS $e}, {a1, b1, null AS c1, d1, 1 AS $e}, {a1, 
null AS b1, null AS c1, d1, 3 AS $e}, {null AS a1, null AS b1, null AS c1, d1, 
7 AS $e}])
+         +- Expand(projects=[a1, b1, c1, d1, $e], projects=[{a1, b1, c1, d1, 0 
AS $e}, {a1, b1, null AS c1, d1, 1 AS $e}, {a1, null AS b1, null AS c1, d1, 3 
AS $e}, {null AS a1, null AS b1, null AS c1, d1, 7 AS $e}])
             +- TableSourceScan(table=[[default_catalog, default_database, T1, 
source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1])
 ]]>
     </Resource>
@@ -697,7 +697,7 @@ LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT(DISTINCT 
$2)], EXPR$3=[SUM(DISTIN
 HashAggregate(isMerge=[false], groupBy=[a1], auxGrouping=[d1], select=[a1, d1, 
COUNT(c1) FILTER $g_4 AS EXPR$2, SUM(b1) FILTER $g_2 AS EXPR$3])
 +- Exchange(distribution=[hash[a1]])
    +- Calc(select=[a1, b1, c1, d1, =(CASE(=($e, 2:BIGINT), 2:BIGINT, 
4:BIGINT), 2) AS $g_2, =(CASE(=($e, 2:BIGINT), 2:BIGINT, 4:BIGINT), 4) AS $g_4])
-      +- Expand(projects=[{a1=[$0], b1=[$1], c1=[null], d1=[$3], $e=[2]}, 
{a1=[$0], b1=[null], c1=[$2], d1=[$3], $e=[4]}], projects=[{a1, b1, null AS c1, 
d1, 2 AS $e}, {a1, null AS b1, c1, d1, 4 AS $e}])
+      +- Expand(projects=[a1, b1, c1, d1, $e], projects=[{a1, b1, null AS c1, 
d1, 2 AS $e}, {a1, null AS b1, c1, d1, 4 AS $e}])
          +- TableSourceScan(table=[[default_catalog, default_database, T1, 
source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1])
 ]]>
     </Resource>
@@ -715,7 +715,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[SUM(DISTINCT $1)], 
EXPR$2=[MAX($1)], EXPR$
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[a1, b1, b1 AS b10, c1])
+Calc(select=[a1, b1 AS EXPR$1, b1 AS EXPR$2, c1 AS EXPR$3])
 +- TableSourceScan(table=[[default_catalog, default_database, T1, source: 
[TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1])
 ]]>
     </Resource>
@@ -737,7 +737,7 @@ SortAggregate(isMerge=[false], groupBy=[a1], select=[a1, 
MIN(d1) FILTER $g_1 AS
 +- Calc(select=[a1, c1, d1, b1 AS EXPR$3, b1 AS EXPR$4, =(CASE(=($e, 
0:BIGINT), 0:BIGINT, 1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 
1:BIGINT), 1) AS $g_1])
    +- Sort(orderBy=[a1 ASC])
       +- Exchange(distribution=[hash[a1]])
-         +- Expand(projects=[{a1=[$0], b1=[$1], c1=[$2], d1=[$3], $e=[0]}, 
{a1=[$0], b1=[$1], c1=[null], d1=[$3], $e=[1]}], projects=[{a1, b1, c1, d1, 0 
AS $e}, {a1, b1, null AS c1, d1, 1 AS $e}])
+         +- Expand(projects=[a1, b1, c1, d1, $e], projects=[{a1, b1, c1, d1, 0 
AS $e}, {a1, b1, null AS c1, d1, 1 AS $e}])
             +- TableSourceScan(table=[[default_catalog, default_database, T1, 
source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1])
 ]]>
     </Resource>
@@ -760,7 +760,7 @@ Calc(select=[EXPR$0])
 +- HashAggregate(isMerge=[true], groupBy=[$f0], select=[$f0, 
Final_COUNT(count$0) AS EXPR$0])
    +- Exchange(distribution=[hash[$f0]])
       +- LocalHashAggregate(groupBy=[$f0], select=[$f0, Partial_COUNT(c1) AS 
count$0])
-         +- Calc(select=[1 AS $f0, true AS $f1, c1])
+         +- Calc(select=[1 AS $f0, c1])
             +- TableSourceScan(table=[[default_catalog, default_database, T1, 
source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1])
 ]]>
     </Resource>
@@ -841,7 +841,7 @@ LogicalProject(a1=[$0], b1=[$1], EXPR$2=[$4])
       <![CDATA[
 HashAggregate(isMerge=[false], groupBy=[a1], auxGrouping=[b1], select=[a1, b1, 
COUNT(c1) AS EXPR$2])
 +- Exchange(distribution=[hash[a1]])
-   +- Calc(select=[a1, b1, 1 AS $f2, true AS $f3, c1])
+   +- Calc(select=[a1, b1, c1])
       +- TableSourceScan(table=[[default_catalog, default_database, T1, 
source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1])
 ]]>
     </Resource>
@@ -902,7 +902,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], 
EXPR$2=[SUM($1)])
 HashAggregate(isMerge=[false], groupBy=[a1], select=[a1, COUNT(b1) FILTER $g_0 
AS EXPR$1, MIN(EXPR$2) FILTER $g_1 AS EXPR$2])
 +- Exchange(distribution=[hash[a1]])
    +- Calc(select=[a1, b1, b1_0 AS EXPR$2, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 
1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 1) AS $g_1])
-      +- Expand(projects=[{a1=[$0], b1=[$1], c1=[$2], d1=[$3], $e=[0], 
b1_0=[$1]}, {a1=[$0], b1=[null], c1=[$2], d1=[$3], $e=[1], b1_0=[$1]}], 
projects=[{a1, b1, c1, d1, 0 AS $e, b1 AS b1_0}, {a1, null AS b1, c1, d1, 1 AS 
$e, b1 AS b1_0}])
+      +- Expand(projects=[a1, b1, c1, d1, $e, b1_0], projects=[{a1, b1, c1, 
d1, 0 AS $e, b1 AS b1_0}, {a1, null AS b1, c1, d1, 1 AS $e, b1 AS b1_0}])
          +- TableSourceScan(table=[[default_catalog, default_database, T1, 
source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1])
 ]]>
     </Resource>
@@ -924,7 +924,7 @@ SortAggregate(isMerge=[false], groupBy=[a1], select=[a1, 
MIN(c1) FILTER $g_1 AS
 +- Sort(orderBy=[a1 ASC])
    +- Exchange(distribution=[hash[a1]])
       +- Calc(select=[a1, b1, c1, b1_0 AS EXPR$3, =(CASE(=($e, 0:BIGINT), 
0:BIGINT, 1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 
1) AS $g_1])
-         +- Expand(projects=[{a1=[$0], b1=[$1], c1=[$2], d1=[$3], $e=[0], 
b1_0=[$1]}, {a1=[$0], b1=[null], c1=[$2], d1=[$3], $e=[1], b1_0=[$1]}], 
projects=[{a1, b1, c1, d1, 0 AS $e, b1 AS b1_0}, {a1, null AS b1, c1, d1, 1 AS 
$e, b1 AS b1_0}])
+         +- Expand(projects=[a1, b1, c1, d1, $e, b1_0], projects=[{a1, b1, c1, 
d1, 0 AS $e, b1 AS b1_0}, {a1, null AS b1, c1, d1, 1 AS $e, b1 AS b1_0}])
             +- TableSourceScan(table=[[default_catalog, default_database, T1, 
source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1])
 ]]>
     </Resource>
@@ -945,7 +945,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], 
EXPR$2=[SUM($2)])
 HashAggregate(isMerge=[false], groupBy=[a1], select=[a1, COUNT(c1) FILTER $g_0 
AS EXPR$1, MIN(EXPR$2) FILTER $g_1 AS EXPR$2])
 +- Exchange(distribution=[hash[a1]])
    +- Calc(select=[a1, c1, b1 AS EXPR$2, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 
1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 1) AS $g_1])
-      +- Expand(projects=[{a1=[$0], b1=[$1], c1=[$2], d1=[$3], $e=[0]}, 
{a1=[$0], b1=[$1], c1=[null], d1=[$3], $e=[1]}], projects=[{a1, b1, c1, d1, 0 
AS $e}, {a1, b1, null AS c1, d1, 1 AS $e}])
+      +- Expand(projects=[a1, b1, c1, d1, $e], projects=[{a1, b1, c1, d1, 0 AS 
$e}, {a1, b1, null AS c1, d1, 1 AS $e}])
          +- TableSourceScan(table=[[default_catalog, default_database, T1, 
source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1])
 ]]>
     </Resource>
@@ -967,7 +967,7 @@ SortAggregate(isMerge=[false], groupBy=[a1], select=[a1, 
MIN(d1) FILTER $g_1 AS
 +- Sort(orderBy=[a1 ASC])
    +- Exchange(distribution=[hash[a1]])
       +- Calc(select=[a1, c1, d1, b1 AS EXPR$3, =(CASE(=($e, 0:BIGINT), 
0:BIGINT, 1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 
1) AS $g_1])
-         +- Expand(projects=[{a1=[$0], b1=[$1], c1=[$2], d1=[$3], $e=[0]}, 
{a1=[$0], b1=[$1], c1=[null], d1=[$3], $e=[1]}], projects=[{a1, b1, c1, d1, 0 
AS $e}, {a1, b1, null AS c1, d1, 1 AS $e}])
+         +- Expand(projects=[a1, b1, c1, d1, $e], projects=[{a1, b1, c1, d1, 0 
AS $e}, {a1, b1, null AS c1, d1, 1 AS $e}])
             +- TableSourceScan(table=[[default_catalog, default_database, T1, 
source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1])
 ]]>
     </Resource>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml
index 07b7b71..58fc85d 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/agg/WindowAggregateTest.xml
@@ -33,7 +33,7 @@ FROM MyTable1
       <![CDATA[
 LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], 
EXPR$4=[TUMBLE_START($0)], EXPR$5=[TUMBLE_END($0)])
 +- LogicalAggregate(group=[{0}], EXPR$0=[VAR_POP($1)], EXPR$1=[VAR_SAMP($1)], 
EXPR$2=[STDDEV_POP($1)], EXPR$3=[STDDEV_SAMP($1)])
-   +- LogicalProject($f0=[TUMBLE($0, 900000:INTERVAL MINUTE)], b=[$2])
+   +- LogicalProject($f0=[$TUMBLE($0, 900000:INTERVAL MINUTE)], b=[$2])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(ts, a, b, c)]]])
 ]]>
     </Resource>
@@ -65,7 +65,7 @@ FROM MyTable1
       <![CDATA[
 LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], 
EXPR$4=[TUMBLE_START($0)], EXPR$5=[TUMBLE_END($0)])
 +- LogicalAggregate(group=[{0}], EXPR$0=[VAR_POP($1)], EXPR$1=[VAR_SAMP($1)], 
EXPR$2=[STDDEV_POP($1)], EXPR$3=[STDDEV_SAMP($1)])
-   +- LogicalProject($f0=[TUMBLE($0, 900000:INTERVAL MINUTE)], b=[$2])
+   +- LogicalProject($f0=[$TUMBLE($0, 900000:INTERVAL MINUTE)], b=[$2])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(ts, a, b, c)]]])
 ]]>
     </Resource>
@@ -96,7 +96,7 @@ FROM MyTable1
       <![CDATA[
 LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], 
EXPR$4=[TUMBLE_START($0)], EXPR$5=[TUMBLE_END($0)])
 +- LogicalAggregate(group=[{0}], EXPR$0=[VAR_POP($1)], EXPR$1=[VAR_SAMP($1)], 
EXPR$2=[STDDEV_POP($1)], EXPR$3=[STDDEV_SAMP($1)])
-   +- LogicalProject($f0=[TUMBLE($0, 900000:INTERVAL MINUTE)], b=[$2])
+   +- LogicalProject($f0=[$TUMBLE($0, 900000:INTERVAL MINUTE)], b=[$2])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(ts, a, b, c)]]])
 ]]>
     </Resource>
@@ -111,154 +111,6 @@ Calc(select=[CAST(/(-($f0, /(*($f1, $f1), $f2)), $f2)) AS 
EXPR$0, CAST(/(-($f0,
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testWindowAggregateWithDifferentWindows[aggStrategy=AUTO]">
-    <Resource name="sql">
-      <![CDATA[
-WITH window_1h AS (
-    SELECT 1
-    FROM MyTable2
-    GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
-),
-
-window_2h AS (
-    SELECT 1
-    FROM MyTable2
-    GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
-)
-
-(SELECT * FROM window_1h)
-UNION ALL
-(SELECT * FROM window_2h)
-]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalUnion(all=[true])
-:- LogicalProject(EXPR$0=[1])
-:  +- LogicalAggregate(group=[{0}])
-:     +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 3600000:INTERVAL 
HOUR)])
-:        +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]])
-+- LogicalProject(EXPR$0=[1])
-   +- LogicalAggregate(group=[{0}])
-      +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 7200000:INTERVAL 
HOUR)])
-         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Union(all=[true], union=[EXPR$0])
-:- Calc(select=[1 AS EXPR$0])
-:  +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 
3600000)], select=[])
-:     +- Exchange(distribution=[single])
-:        +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 
3600000, 3600000)], select=[])
-:           +- Calc(select=[ts], reuse_id=[1])
-:              +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
-+- Calc(select=[1 AS EXPR$0])
-   +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 
3600000)], select=[])
-      +- Exchange(distribution=[single])
-         +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 
7200000, 3600000)], select=[])
-            +- Reused(reference_id=[1])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase 
name="testWindowAggregateWithDifferentWindows[aggStrategy=ONE_PHASE]">
-    <Resource name="sql">
-      <![CDATA[
-WITH window_1h AS (
-    SELECT 1
-    FROM MyTable2
-    GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
-),
-
-window_2h AS (
-    SELECT 1
-    FROM MyTable2
-    GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
-)
-
-(SELECT * FROM window_1h)
-UNION ALL
-(SELECT * FROM window_2h)
-]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalUnion(all=[true])
-:- LogicalProject(EXPR$0=[1])
-:  +- LogicalAggregate(group=[{0}])
-:     +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 3600000:INTERVAL 
HOUR)])
-:        +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]])
-+- LogicalProject(EXPR$0=[1])
-   +- LogicalAggregate(group=[{0}])
-      +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 7200000:INTERVAL 
HOUR)])
-         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Union(all=[true], union=[EXPR$0])
-:- Calc(select=[1 AS EXPR$0])
-:  +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 
3600000)], select=[])
-:     +- Exchange(distribution=[single], reuse_id=[1])
-:        +- Calc(select=[ts])
-:           +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
-+- Calc(select=[1 AS EXPR$0])
-   +- SortWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 
3600000)], select=[])
-      +- Sort(orderBy=[ts ASC])
-         +- Reused(reference_id=[1])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase 
name="testWindowAggregateWithDifferentWindows[aggStrategy=TWO_PHASE]">
-    <Resource name="sql">
-      <![CDATA[
-WITH window_1h AS (
-    SELECT 1
-    FROM MyTable2
-    GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
-),
-
-window_2h AS (
-    SELECT 1
-    FROM MyTable2
-    GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
-)
-
-(SELECT * FROM window_1h)
-UNION ALL
-(SELECT * FROM window_2h)
-]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalUnion(all=[true])
-:- LogicalProject(EXPR$0=[1])
-:  +- LogicalAggregate(group=[{0}])
-:     +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 3600000:INTERVAL 
HOUR)])
-:        +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]])
-+- LogicalProject(EXPR$0=[1])
-   +- LogicalAggregate(group=[{0}])
-      +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 7200000:INTERVAL 
HOUR)])
-         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Union(all=[true], union=[EXPR$0])
-:- Calc(select=[1 AS EXPR$0])
-:  +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 
3600000)], select=[])
-:     +- Exchange(distribution=[single])
-:        +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 
3600000, 3600000)], select=[])
-:           +- Calc(select=[ts], reuse_id=[1])
-:              +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
-+- Calc(select=[1 AS EXPR$0])
-   +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 
3600000)], select=[])
-      +- Exchange(distribution=[single])
-         +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 
7200000, 3600000)], select=[])
-            +- Reused(reference_id=[1])
-]]>
-    </Resource>
-  </TestCase>
   <TestCase name="testExpressionOnWindowHavingFunction[aggStrategy=AUTO]">
     <Resource name="sql">
       <![CDATA[
@@ -471,7 +323,7 @@ Calc(select=[EXPR$0, w$start AS EXPR$1, w$end AS EXPR$2])
       <![CDATA[
 LogicalProject(EXPR$0=[$1], EXPR$1=[$2])
 +- LogicalAggregate(group=[{0}], EXPR$0=[AVG($1)], EXPR$1=[SUM($2)])
-   +- LogicalProject($f0=[TUMBLE($1, 3000:INTERVAL SECOND)], c=[$2], a=[$0])
+   +- LogicalProject($f0=[$TUMBLE($1, 3000:INTERVAL SECOND)], c=[$2], a=[$0])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d)]]])
 ]]>
     </Resource>
@@ -493,7 +345,7 @@ HashWindowAggregate(window=[TumblingGroupWindow('w$, b, 
3000)], select=[Final_AV
       <![CDATA[
 LogicalProject(EXPR$0=[$1], EXPR$1=[$2])
 +- LogicalAggregate(group=[{0}], EXPR$0=[AVG($1)], EXPR$1=[SUM($2)])
-   +- LogicalProject($f0=[TUMBLE($1, 3000:INTERVAL SECOND)], c=[$2], a=[$0])
+   +- LogicalProject($f0=[$TUMBLE($1, 3000:INTERVAL SECOND)], c=[$2], a=[$0])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d)]]])
 ]]>
     </Resource>
@@ -514,7 +366,7 @@ HashWindowAggregate(window=[TumblingGroupWindow('w$, b, 
3000)], select=[AVG(c) A
       <![CDATA[
 LogicalProject(EXPR$0=[$1], EXPR$1=[$2])
 +- LogicalAggregate(group=[{0}], EXPR$0=[AVG($1)], EXPR$1=[SUM($2)])
-   +- LogicalProject($f0=[TUMBLE($1, 3000:INTERVAL SECOND)], c=[$2], a=[$0])
+   +- LogicalProject($f0=[$TUMBLE($1, 3000:INTERVAL SECOND)], c=[$2], a=[$0])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d)]]])
 ]]>
     </Resource>
@@ -580,7 +432,7 @@ HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 
5400000, 900000)], selec
       <![CDATA[
 LogicalProject(sumA=[$1], cntB=[$2])
 +- LogicalAggregate(group=[{0}], sumA=[SUM($1)], cntB=[COUNT($2)])
-   +- LogicalProject($f0=[TUMBLE($4, 7200000:INTERVAL HOUR)], a=[$0], b=[$1])
+   +- LogicalProject($f0=[$TUMBLE($4, 7200000:INTERVAL HOUR)], a=[$0], b=[$1])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(a, b, c, d, ts)]]])
 ]]>
     </Resource>
@@ -602,7 +454,7 @@ HashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 
7200000)], select=[Fina
       <![CDATA[
 LogicalProject(sumA=[$1], cntB=[$2])
 +- LogicalAggregate(group=[{0}], sumA=[SUM($1)], cntB=[COUNT($2)])
-   +- LogicalProject($f0=[TUMBLE($4, 7200000:INTERVAL HOUR)], a=[$0], b=[$1])
+   +- LogicalProject($f0=[$TUMBLE($4, 7200000:INTERVAL HOUR)], a=[$0], b=[$1])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(a, b, c, d, ts)]]])
 ]]>
     </Resource>
@@ -623,7 +475,7 @@ HashWindowAggregate(window=[TumblingGroupWindow('w$, ts, 
7200000)], select=[SUM(
       <![CDATA[
 LogicalProject(sumA=[$1], cntB=[$2])
 +- LogicalAggregate(group=[{0}], sumA=[SUM($1)], cntB=[COUNT($2)])
-   +- LogicalProject($f0=[TUMBLE($4, 7200000:INTERVAL HOUR)], a=[$0], b=[$1])
+   +- LogicalProject($f0=[$TUMBLE($4, 7200000:INTERVAL HOUR)], a=[$0], b=[$1])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(a, b, c, d, ts)]]])
 ]]>
     </Resource>
@@ -742,7 +594,7 @@ FROM MyTable2
       <![CDATA[
 LogicalProject(EXPR$0=[TUMBLE_START($0)], EXPR$1=[TUMBLE_END($0)], 
EXPR$2=[TUMBLE_ROWTIME($0)], c=[$1], sumA=[$2], minB=[$3])
 +- LogicalAggregate(group=[{0, 1}], sumA=[SUM($2)], minB=[MIN($3)])
-   +- LogicalProject($f0=[TUMBLE($4, 240000:INTERVAL MINUTE)], c=[$2], a=[$0], 
b=[$1])
+   +- LogicalProject($f0=[$TUMBLE($4, 240000:INTERVAL MINUTE)], c=[$2], 
a=[$0], b=[$1])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(a, b, c, d, ts)]]])
 ]]>
     </Resource>
@@ -774,7 +626,7 @@ FROM MyTable2
       <![CDATA[
 LogicalProject(EXPR$0=[TUMBLE_START($0)], EXPR$1=[TUMBLE_END($0)], 
EXPR$2=[TUMBLE_ROWTIME($0)], c=[$1], sumA=[$2], minB=[$3])
 +- LogicalAggregate(group=[{0, 1}], sumA=[SUM($2)], minB=[MIN($3)])
-   +- LogicalProject($f0=[TUMBLE($4, 240000:INTERVAL MINUTE)], c=[$2], a=[$0], 
b=[$1])
+   +- LogicalProject($f0=[$TUMBLE($4, 240000:INTERVAL MINUTE)], c=[$2], 
a=[$0], b=[$1])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(a, b, c, d, ts)]]])
 ]]>
     </Resource>
@@ -805,7 +657,7 @@ FROM MyTable2
       <![CDATA[
 LogicalProject(EXPR$0=[TUMBLE_START($0)], EXPR$1=[TUMBLE_END($0)], 
EXPR$2=[TUMBLE_ROWTIME($0)], c=[$1], sumA=[$2], minB=[$3])
 +- LogicalAggregate(group=[{0, 1}], sumA=[SUM($2)], minB=[MIN($3)])
-   +- LogicalProject($f0=[TUMBLE($4, 240000:INTERVAL MINUTE)], c=[$2], a=[$0], 
b=[$1])
+   +- LogicalProject($f0=[$TUMBLE($4, 240000:INTERVAL MINUTE)], c=[$2], 
a=[$0], b=[$1])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(a, b, c, d, ts)]]])
 ]]>
     </Resource>
@@ -841,7 +693,7 @@ GROUP BY TUMBLE(b, INTERVAL '15' MINUTE)
       <![CDATA[
 LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
 +- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)])
-   +- LogicalProject($f0=[TUMBLE($1, 900000:INTERVAL MINUTE)], 
correct=[CASE(=($0, 1), 1, 99)])
+   +- LogicalProject($f0=[$TUMBLE($1, 900000:INTERVAL MINUTE)], 
correct=[CASE(=($0, 1), 1, 99)])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d)]]])
 ]]>
     </Resource>
@@ -877,7 +729,7 @@ GROUP BY TUMBLE(b, INTERVAL '15' MINUTE)
       <![CDATA[
 LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
 +- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)])
-   +- LogicalProject($f0=[TUMBLE($1, 900000:INTERVAL MINUTE)], 
correct=[CASE(=($0, 1), 1, 99)])
+   +- LogicalProject($f0=[$TUMBLE($1, 900000:INTERVAL MINUTE)], 
correct=[CASE(=($0, 1), 1, 99)])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d)]]])
 ]]>
     </Resource>
@@ -912,7 +764,7 @@ GROUP BY TUMBLE(b, INTERVAL '15' MINUTE)
       <![CDATA[
 LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
 +- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)])
-   +- LogicalProject($f0=[TUMBLE($1, 900000:INTERVAL MINUTE)], 
correct=[CASE(=($0, 1), 1, 99)])
+   +- LogicalProject($f0=[$TUMBLE($1, 900000:INTERVAL MINUTE)], 
correct=[CASE(=($0, 1), 1, 99)])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d)]]])
 ]]>
     </Resource>
@@ -1260,7 +1112,7 @@ Calc(select=[EXPR$0])
       <![CDATA[
 LogicalProject(EXPR$0=[$2])
 +- LogicalAggregate(group=[{0, 1}], EXPR$0=[COUNT($2)])
-   +- LogicalProject(a=[$1], $f1=[TUMBLE($0, 3000:INTERVAL SECOND)], c=[$3])
+   +- LogicalProject(a=[$1], $f1=[$TUMBLE($0, 3000:INTERVAL SECOND)], c=[$3])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(ts, a, b, c)]]])
 ]]>
     </Resource>
@@ -1283,7 +1135,7 @@ Calc(select=[EXPR$0])
       <![CDATA[
 LogicalProject(EXPR$0=[$2])
 +- LogicalAggregate(group=[{0, 1}], EXPR$0=[COUNT($2)])
-   +- LogicalProject(a=[$1], $f1=[TUMBLE($0, 3000:INTERVAL SECOND)], c=[$3])
+   +- LogicalProject(a=[$1], $f1=[$TUMBLE($0, 3000:INTERVAL SECOND)], c=[$3])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(ts, a, b, c)]]])
 ]]>
     </Resource>
@@ -1305,7 +1157,7 @@ Calc(select=[EXPR$0])
       <![CDATA[
 LogicalProject(EXPR$0=[$2])
 +- LogicalAggregate(group=[{0, 1}], EXPR$0=[COUNT($2)])
-   +- LogicalProject(a=[$1], $f1=[TUMBLE($0, 3000:INTERVAL SECOND)], c=[$3])
+   +- LogicalProject(a=[$1], $f1=[$TUMBLE($0, 3000:INTERVAL SECOND)], c=[$3])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(ts, a, b, c)]]])
 ]]>
     </Resource>
@@ -1328,7 +1180,7 @@ Calc(select=[EXPR$0])
       <![CDATA[
 LogicalProject(EXPR$0=[$3], EXPR$1=[$4])
 +- LogicalAggregate(group=[{0, 1, 2}], EXPR$0=[AVG($3)], EXPR$1=[COUNT($0)])
-   +- LogicalProject(a=[$0], d=[$3], $f2=[TUMBLE($1, 3000:INTERVAL SECOND)], 
c=[$2])
+   +- LogicalProject(a=[$0], d=[$3], $f2=[$TUMBLE($1, 3000:INTERVAL SECOND)], 
c=[$2])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d)]]])
 ]]>
     </Resource>
@@ -1350,7 +1202,7 @@ Calc(select=[EXPR$0, EXPR$1])
       <![CDATA[
 LogicalProject(EXPR$0=[$3], EXPR$1=[$4])
 +- LogicalAggregate(group=[{0, 1, 2}], EXPR$0=[AVG($3)], EXPR$1=[COUNT($0)])
-   +- LogicalProject(a=[$0], d=[$3], $f2=[TUMBLE($1, 3000:INTERVAL SECOND)], 
c=[$2])
+   +- LogicalProject(a=[$0], d=[$3], $f2=[$TUMBLE($1, 3000:INTERVAL SECOND)], 
c=[$2])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d)]]])
 ]]>
     </Resource>
@@ -1371,7 +1223,7 @@ Calc(select=[EXPR$0, EXPR$1])
       <![CDATA[
 LogicalProject(EXPR$0=[$3], EXPR$1=[$4])
 +- LogicalAggregate(group=[{0, 1, 2}], EXPR$0=[AVG($3)], EXPR$1=[COUNT($0)])
-   +- LogicalProject(a=[$0], d=[$3], $f2=[TUMBLE($1, 3000:INTERVAL SECOND)], 
c=[$2])
+   +- LogicalProject(a=[$0], d=[$3], $f2=[$TUMBLE($1, 3000:INTERVAL SECOND)], 
c=[$2])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d)]]])
 ]]>
     </Resource>
@@ -1385,30 +1237,6 @@ Calc(select=[EXPR$0, EXPR$1])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testTumblingWindowWithUdAgg[aggStrategy=AUTO]">
-    <Resource name="sql">
-      <![CDATA[SELECT weightedAvg(b, a) AS wAvg FROM MyTable2 GROUP BY 
TUMBLE(ts, INTERVAL '4' MINUTE)]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalProject(wAvg=[$1])
-+- LogicalAggregate(group=[{0}], wAvg=[weightedAvg($1, $2)])
-   +- LogicalProject($f0=[TUMBLE($4, 240000:INTERVAL MINUTE)], b=[$1], a=[$0])
-      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(a, b, c, d, ts)]]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-SortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 240000)], 
select=[Final_weightedAvg(wAvg) AS wAvg])
-+- Sort(orderBy=[assignedWindow$ ASC])
-   +- Exchange(distribution=[single])
-      +- LocalSortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 
240000)], select=[Partial_weightedAvg(b, a) AS wAvg])
-         +- Sort(orderBy=[ts ASC])
-            +- Calc(select=[ts, b, a])
-               +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
-]]>
-    </Resource>
-  </TestCase>
   <TestCase name="testTumblingWindowSortAgg1[aggStrategy=AUTO]">
     <Resource name="sql">
       <![CDATA[SELECT MAX(c) FROM MyTable1 GROUP BY a, TUMBLE(ts, INTERVAL '3' 
SECOND)]]>
@@ -1417,7 +1245,7 @@ SortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 
240000)], select=[Final
       <![CDATA[
 LogicalProject(EXPR$0=[$2])
 +- LogicalAggregate(group=[{0, 1}], EXPR$0=[MAX($2)])
-   +- LogicalProject(a=[$1], $f1=[TUMBLE($0, 3000:INTERVAL SECOND)], c=[$3])
+   +- LogicalProject(a=[$1], $f1=[$TUMBLE($0, 3000:INTERVAL SECOND)], c=[$3])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(ts, a, b, c)]]])
 ]]>
     </Resource>
@@ -1442,7 +1270,7 @@ Calc(select=[EXPR$0])
       <![CDATA[
 LogicalProject(EXPR$0=[$2])
 +- LogicalAggregate(group=[{0, 1}], EXPR$0=[MAX($2)])
-   +- LogicalProject(a=[$1], $f1=[TUMBLE($0, 3000:INTERVAL SECOND)], c=[$3])
+   +- LogicalProject(a=[$1], $f1=[$TUMBLE($0, 3000:INTERVAL SECOND)], c=[$3])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(ts, a, b, c)]]])
 ]]>
     </Resource>
@@ -1465,7 +1293,7 @@ Calc(select=[EXPR$0])
       <![CDATA[
 LogicalProject(EXPR$0=[$2])
 +- LogicalAggregate(group=[{0, 1}], EXPR$0=[MAX($2)])
-   +- LogicalProject(a=[$1], $f1=[TUMBLE($0, 3000:INTERVAL SECOND)], c=[$3])
+   +- LogicalProject(a=[$1], $f1=[$TUMBLE($0, 3000:INTERVAL SECOND)], c=[$3])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(ts, a, b, c)]]])
 ]]>
     </Resource>
@@ -1490,7 +1318,7 @@ Calc(select=[EXPR$0])
       <![CDATA[
 LogicalProject(EXPR$0=[$3], EXPR$1=[$4])
 +- LogicalAggregate(group=[{0, 1, 2}], EXPR$0=[AVG($3)], EXPR$1=[countFun($0)])
-   +- LogicalProject(a=[$0], d=[$3], $f2=[TUMBLE($1, 3000:INTERVAL SECOND)], 
c=[$2])
+   +- LogicalProject(a=[$0], d=[$3], $f2=[$TUMBLE($1, 3000:INTERVAL SECOND)], 
c=[$2])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d)]]])
 ]]>
     </Resource>
@@ -1512,7 +1340,7 @@ Calc(select=[EXPR$0, EXPR$1])
       <![CDATA[
 LogicalProject(EXPR$0=[$3], EXPR$1=[$4])
 +- LogicalAggregate(group=[{0, 1, 2}], EXPR$0=[AVG($3)], EXPR$1=[countFun($0)])
-   +- LogicalProject(a=[$0], d=[$3], $f2=[TUMBLE($1, 3000:INTERVAL SECOND)], 
c=[$2])
+   +- LogicalProject(a=[$0], d=[$3], $f2=[$TUMBLE($1, 3000:INTERVAL SECOND)], 
c=[$2])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d)]]])
 ]]>
     </Resource>
@@ -1534,7 +1362,7 @@ Calc(select=[EXPR$0, EXPR$1])
       <![CDATA[
 LogicalProject(EXPR$0=[$3], EXPR$1=[$4])
 +- LogicalAggregate(group=[{0, 1, 2}], EXPR$0=[AVG($3)], EXPR$1=[countFun($0)])
-   +- LogicalProject(a=[$0], d=[$3], $f2=[TUMBLE($1, 3000:INTERVAL SECOND)], 
c=[$2])
+   +- LogicalProject(a=[$0], d=[$3], $f2=[$TUMBLE($1, 3000:INTERVAL SECOND)], 
c=[$2])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c, d)]]])
 ]]>
     </Resource>
@@ -1550,26 +1378,27 @@ Calc(select=[EXPR$0, EXPR$1])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testWindowEndOnly[aggStrategy=AUTO]">
+  <TestCase name="testTumblingWindowWithUdAgg[aggStrategy=AUTO]">
     <Resource name="sql">
-      <![CDATA[SELECT TUMBLE_END(ts, INTERVAL '4' MINUTE) FROM MyTable2 GROUP 
BY TUMBLE(ts, INTERVAL '4' MINUTE), c]]>
+      <![CDATA[SELECT weightedAvg(b, a) AS wAvg FROM MyTable2 GROUP BY 
TUMBLE(ts, INTERVAL '4' MINUTE)]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(EXPR$0=[TUMBLE_END($0)])
-+- LogicalAggregate(group=[{0, 1}])
-   +- LogicalProject($f0=[TUMBLE($4, 240000:INTERVAL MINUTE)], c=[$2])
+LogicalProject(wAvg=[$1])
++- LogicalAggregate(group=[{0}], wAvg=[weightedAvg($1, $2)])
+   +- LogicalProject($f0=[$TUMBLE($4, 240000:INTERVAL MINUTE)], b=[$1], a=[$0])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(a, b, c, d, ts)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[w$end AS EXPR$0])
-+- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts, 
240000)], properties=[w$start, w$end, w$rowtime], select=[c])
-   +- Exchange(distribution=[hash[c]])
-      +- LocalHashWindowAggregate(groupBy=[c], 
window=[TumblingGroupWindow('w$, ts, 240000)], properties=[w$start, w$end, 
w$rowtime], select=[c])
-         +- Calc(select=[ts, c])
-            +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
+SortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 240000)], 
select=[Final_weightedAvg(wAvg) AS wAvg])
++- Sort(orderBy=[assignedWindow$ ASC])
+   +- Exchange(distribution=[single])
+      +- LocalSortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 
240000)], select=[Partial_weightedAvg(b, a) AS wAvg])
+         +- Sort(orderBy=[ts ASC])
+            +- Calc(select=[ts, b, a])
+               +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
 ]]>
     </Resource>
   </TestCase>
@@ -1581,7 +1410,7 @@ Calc(select=[w$end AS EXPR$0])
       <![CDATA[
 LogicalProject(wAvg=[$1])
 +- LogicalAggregate(group=[{0}], wAvg=[weightedAvg($1, $2)])
-   +- LogicalProject($f0=[TUMBLE($4, 240000:INTERVAL MINUTE)], b=[$1], a=[$0])
+   +- LogicalProject($f0=[$TUMBLE($4, 240000:INTERVAL MINUTE)], b=[$1], a=[$0])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(a, b, c, d, ts)]]])
 ]]>
     </Resource>
@@ -1595,6 +1424,151 @@ SortWindowAggregate(window=[TumblingGroupWindow('w$, 
ts, 240000)], select=[weigh
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testTumblingWindowWithUdAgg[aggStrategy=TWO_PHASE]">
+    <Resource name="sql">
+      <![CDATA[SELECT weightedAvg(b, a) AS wAvg FROM MyTable2 GROUP BY 
TUMBLE(ts, INTERVAL '4' MINUTE)]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(wAvg=[$1])
++- LogicalAggregate(group=[{0}], wAvg=[weightedAvg($1, $2)])
+   +- LogicalProject($f0=[$TUMBLE($4, 240000:INTERVAL MINUTE)], b=[$1], a=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(a, b, c, d, ts)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+SortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 240000)], 
select=[Final_weightedAvg(wAvg) AS wAvg])
++- Sort(orderBy=[assignedWindow$ ASC])
+   +- Exchange(distribution=[single])
+      +- LocalSortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 
240000)], select=[Partial_weightedAvg(b, a) AS wAvg])
+         +- Sort(orderBy=[ts ASC])
+            +- Calc(select=[ts, b, a])
+               +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testWindowAggregateWithDifferentWindows[aggStrategy=AUTO]">
+    <Resource name="sql">
+      <![CDATA[
+WITH window_1h AS (
+    SELECT 1
+    FROM MyTable2
+    GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
+),
+
+window_2h AS (
+    SELECT 1
+    FROM MyTable2
+    GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
+)
+
+(SELECT * FROM window_1h)
+UNION ALL
+(SELECT * FROM window_2h)
+]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalUnion(all=[true])
+:- LogicalProject(EXPR$0=[1])
+:  +- LogicalAggregate(group=[{0}])
+:     +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 3600000:INTERVAL 
HOUR)])
+:        +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]])
++- LogicalProject(EXPR$0=[1])
+   +- LogicalAggregate(group=[{0}])
+      +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 7200000:INTERVAL 
HOUR)])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Union(all=[true], union=[EXPR$0])
+:- Calc(select=[1 AS EXPR$0])
+:  +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 
3600000)], select=[])
+:     +- Exchange(distribution=[single])
+:        +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 
3600000, 3600000)], select=[])
+:           +- Calc(select=[ts], reuse_id=[1])
+:              +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
++- Calc(select=[1 AS EXPR$0])
+   +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 
3600000)], select=[])
+      +- Exchange(distribution=[single])
+         +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 
7200000, 3600000)], select=[])
+            +- Reused(reference_id=[1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testWindowEndOnly[aggStrategy=AUTO]">
+    <Resource name="sql">
+      <![CDATA[SELECT TUMBLE_END(ts, INTERVAL '4' MINUTE) FROM MyTable2 GROUP 
BY TUMBLE(ts, INTERVAL '4' MINUTE), c]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(EXPR$0=[TUMBLE_END($0)])
++- LogicalAggregate(group=[{0, 1}])
+   +- LogicalProject($f0=[$TUMBLE($4, 240000:INTERVAL MINUTE)], c=[$2])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(a, b, c, d, ts)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[w$end AS EXPR$0])
++- HashWindowAggregate(groupBy=[c], window=[TumblingGroupWindow('w$, ts, 
240000)], properties=[w$start, w$end, w$rowtime], select=[c])
+   +- Exchange(distribution=[hash[c]])
+      +- LocalHashWindowAggregate(groupBy=[c], 
window=[TumblingGroupWindow('w$, ts, 240000)], properties=[w$start, w$end, 
w$rowtime], select=[c])
+         +- Calc(select=[ts, c])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testWindowAggregateWithDifferentWindows[aggStrategy=ONE_PHASE]">
+    <Resource name="sql">
+      <![CDATA[
+WITH window_1h AS (
+    SELECT 1
+    FROM MyTable2
+    GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
+),
+
+window_2h AS (
+    SELECT 1
+    FROM MyTable2
+    GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
+)
+
+(SELECT * FROM window_1h)
+UNION ALL
+(SELECT * FROM window_2h)
+]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalUnion(all=[true])
+:- LogicalProject(EXPR$0=[1])
+:  +- LogicalAggregate(group=[{0}])
+:     +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 3600000:INTERVAL 
HOUR)])
+:        +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]])
++- LogicalProject(EXPR$0=[1])
+   +- LogicalAggregate(group=[{0}])
+      +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 7200000:INTERVAL 
HOUR)])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Union(all=[true], union=[EXPR$0])
+:- Calc(select=[1 AS EXPR$0])
+:  +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 
3600000)], select=[])
+:     +- Exchange(distribution=[single], reuse_id=[1])
+:        +- Calc(select=[ts])
+:           +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
++- Calc(select=[1 AS EXPR$0])
+   +- SortWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 
3600000)], select=[])
+      +- Sort(orderBy=[ts ASC])
+         +- Reused(reference_id=[1])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testWindowEndOnly[aggStrategy=ONE_PHASE]">
     <Resource name="sql">
       <![CDATA[SELECT TUMBLE_END(ts, INTERVAL '4' MINUTE) FROM MyTable2 GROUP 
BY TUMBLE(ts, INTERVAL '4' MINUTE), c]]>
@@ -1603,7 +1577,7 @@ SortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 
240000)], select=[weigh
       <![CDATA[
 LogicalProject(EXPR$0=[TUMBLE_END($0)])
 +- LogicalAggregate(group=[{0, 1}])
-   +- LogicalProject($f0=[TUMBLE($4, 240000:INTERVAL MINUTE)], c=[$2])
+   +- LogicalProject($f0=[$TUMBLE($4, 240000:INTERVAL MINUTE)], c=[$2])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(a, b, c, d, ts)]]])
 ]]>
     </Resource>
@@ -1617,27 +1591,53 @@ Calc(select=[w$end AS EXPR$0])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testTumblingWindowWithUdAgg[aggStrategy=TWO_PHASE]">
+  <TestCase 
name="testWindowAggregateWithDifferentWindows[aggStrategy=TWO_PHASE]">
     <Resource name="sql">
-      <![CDATA[SELECT weightedAvg(b, a) AS wAvg FROM MyTable2 GROUP BY 
TUMBLE(ts, INTERVAL '4' MINUTE)]]>
+      <![CDATA[
+WITH window_1h AS (
+    SELECT 1
+    FROM MyTable2
+    GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
+),
+
+window_2h AS (
+    SELECT 1
+    FROM MyTable2
+    GROUP BY HOP(`ts`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
+)
+
+(SELECT * FROM window_1h)
+UNION ALL
+(SELECT * FROM window_2h)
+]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(wAvg=[$1])
-+- LogicalAggregate(group=[{0}], wAvg=[weightedAvg($1, $2)])
-   +- LogicalProject($f0=[TUMBLE($4, 240000:INTERVAL MINUTE)], b=[$1], a=[$0])
-      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(a, b, c, d, ts)]]])
+LogicalUnion(all=[true])
+:- LogicalProject(EXPR$0=[1])
+:  +- LogicalAggregate(group=[{0}])
+:     +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 3600000:INTERVAL 
HOUR)])
+:        +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]])
++- LogicalProject(EXPR$0=[1])
+   +- LogicalAggregate(group=[{0}])
+      +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 7200000:INTERVAL 
HOUR)])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-SortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 240000)], 
select=[Final_weightedAvg(wAvg) AS wAvg])
-+- Sort(orderBy=[assignedWindow$ ASC])
-   +- Exchange(distribution=[single])
-      +- LocalSortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 
240000)], select=[Partial_weightedAvg(b, a) AS wAvg])
-         +- Sort(orderBy=[ts ASC])
-            +- Calc(select=[ts, b, a])
-               +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
+Union(all=[true], union=[EXPR$0])
+:- Calc(select=[1 AS EXPR$0])
+:  +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 
3600000)], select=[])
+:     +- Exchange(distribution=[single])
+:        +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 
3600000, 3600000)], select=[])
+:           +- Calc(select=[ts], reuse_id=[1])
+:              +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
++- Calc(select=[1 AS EXPR$0])
+   +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 
3600000)], select=[])
+      +- Exchange(distribution=[single])
+         +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 
7200000, 3600000)], select=[])
+            +- Reused(reference_id=[1])
 ]]>
     </Resource>
   </TestCase>
@@ -1649,7 +1649,7 @@ SortWindowAggregate(window=[TumblingGroupWindow('w$, ts, 
240000)], select=[Final
       <![CDATA[
 LogicalProject(EXPR$0=[TUMBLE_END($0)])
 +- LogicalAggregate(group=[{0, 1}])
-   +- LogicalProject($f0=[TUMBLE($4, 240000:INTERVAL MINUTE)], c=[$2])
+   +- LogicalProject($f0=[$TUMBLE($4, 240000:INTERVAL MINUTE)], c=[$2])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2, 
source: [TestTableSource(a, b, c, d, ts)]]])
 ]]>
     </Resource>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml
index 14b9c6e..09639d3 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/AggregateReduceGroupingRuleTest.xml
@@ -324,7 +324,7 @@ FlinkLogicalAggregate(group=[{0, 1, 2}], 
b1=[AUXILIARY_GROUP($3)], EXPR$4=[COUNT
       <![CDATA[
 LogicalProject(a4=[$0], b4=[$1], EXPR$2=[$3])
 +- LogicalAggregate(group=[{0, 1, 2}], EXPR$2=[COUNT($3)])
-   +- LogicalProject(a4=[$0], b4=[$1], $f2=[TUMBLE($3, 900000:INTERVAL 
MINUTE)], c4=[$2])
+   +- LogicalProject(a4=[$0], b4=[$1], $f2=[$TUMBLE($3, 900000:INTERVAL 
MINUTE)], c4=[$2])
       +- LogicalTableScan(table=[[default_catalog, default_database, T4, 
source: [TestTableSource(a4, b4, c4, d4)]]])
 ]]>
     </Resource>
@@ -343,7 +343,7 @@ FlinkLogicalWindowAggregate(group=[{0}], 
b4=[AUXILIARY_GROUP($1)], EXPR$2=[COUNT
       <![CDATA[
 LogicalProject(a4=[$0], c4=[$1], EXPR$2=[$3], EXPR$3=[$4])
 +- LogicalAggregate(group=[{0, 1, 2}], EXPR$2=[COUNT($3)], EXPR$3=[AVG($3)])
-   +- LogicalProject(a4=[$0], c4=[$2], $f2=[TUMBLE($3, 900000:INTERVAL 
MINUTE)], b4=[$1])
+   +- LogicalProject(a4=[$0], c4=[$2], $f2=[$TUMBLE($3, 900000:INTERVAL 
MINUTE)], b4=[$1])
       +- LogicalTableScan(table=[[default_catalog, default_database, T4, 
source: [TestTableSource(a4, b4, c4, d4)]]])
 ]]>
     </Resource>
@@ -363,7 +363,7 @@ FlinkLogicalWindowAggregate(group=[{0}], 
c4=[AUXILIARY_GROUP($2)], EXPR$2=[COUNT
 LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT($3)])
 +- LogicalProject(a4=[$0], c4=[$1], s=[TUMBLE_START($2)], b4=[$3])
    +- LogicalAggregate(group=[{0, 1, 2}], b4=[VAR_POP($3)])
-      +- LogicalProject(a4=[$0], c4=[$2], $f2=[TUMBLE($3, 900000:INTERVAL 
MINUTE)], b4=[$1])
+      +- LogicalProject(a4=[$0], c4=[$2], $f2=[$TUMBLE($3, 900000:INTERVAL 
MINUTE)], b4=[$1])
          +- LogicalTableScan(table=[[default_catalog, default_database, T4, 
source: [TestTableSource(a4, b4, c4, d4)]]])
 ]]>
     </Resource>
@@ -387,7 +387,7 @@ FlinkLogicalCalc(select=[a4, c4, s, EXPR$3])
 LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT($3)])
 +- LogicalProject(a4=[$0], c4=[$1], e=[TUMBLE_END($2)], b4=[$3])
    +- LogicalAggregate(group=[{0, 1, 2}], b4=[VAR_POP($3)])
-      +- LogicalProject(a4=[$0], c4=[$2], $f2=[TUMBLE($3, 900000:INTERVAL 
MINUTE)], b4=[$1])
+      +- LogicalProject(a4=[$0], c4=[$2], $f2=[$TUMBLE($3, 900000:INTERVAL 
MINUTE)], b4=[$1])
          +- LogicalTableScan(table=[[default_catalog, default_database, T4, 
source: [TestTableSource(a4, b4, c4, d4)]]])
 ]]>
     </Resource>
@@ -411,7 +411,7 @@ FlinkLogicalCalc(select=[a4, c4, e, EXPR$3])
 LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()])
 +- LogicalProject(a4=[$0], b4=[$3], c4=[$1])
    +- LogicalAggregate(group=[{0, 1, 2}], b4=[VAR_POP($3)])
-      +- LogicalProject(a4=[$0], c4=[$2], $f2=[TUMBLE($3, 900000:INTERVAL 
MINUTE)], b4=[$1])
+      +- LogicalProject(a4=[$0], c4=[$2], $f2=[$TUMBLE($3, 900000:INTERVAL 
MINUTE)], b4=[$1])
          +- LogicalTableScan(table=[[default_catalog, default_database, T4, 
source: [TestTableSource(a4, b4, c4, d4)]]])
 ]]>
     </Resource>
@@ -439,7 +439,7 @@ LogicalAggregate(group=[{0, 1, 2}], groups=[[{0, 1, 2}, {0, 
1}, {0, 2}, {0}, {1,
       <![CDATA[
 FlinkLogicalCalc(select=[a1, b1, c1, EXPR$3])
 +- FlinkLogicalAggregate(group=[{0, 1, 2, 4}], EXPR$3=[COUNT($3)])
-   +- FlinkLogicalExpand(projects=[{a1=[$0], b1=[$1], c1=[$2], d1=[$3], 
$e=[0]}, {a1=[$0], b1=[$1], c1=[null], d1=[$3], $e=[1]}, {a1=[$0], b1=[null], 
c1=[$2], d1=[$3], $e=[2]}, {a1=[$0], b1=[null], c1=[null], d1=[$3], $e=[3]}, 
{a1=[null], b1=[$1], c1=[$2], d1=[$3], $e=[4]}, {a1=[null], b1=[$1], c1=[null], 
d1=[$3], $e=[5]}, {a1=[null], b1=[null], c1=[$2], d1=[$3], $e=[6]}, {a1=[null], 
b1=[null], c1=[null], d1=[$3], $e=[7]}])
+   +- FlinkLogicalExpand(projects=[a1, b1, c1, d1, $e])
       +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, 
b1, c1, d1])
 ]]>
     </Resource>
@@ -458,7 +458,7 @@ LogicalAggregate(group=[{0, 1, 2}], groups=[[{0, 1}, {0, 
2}]], EXPR$3=[COUNT($3)
       <![CDATA[
 FlinkLogicalCalc(select=[a1, b1, c1, EXPR$3])
 +- FlinkLogicalAggregate(group=[{0, 1, 2, 4}], EXPR$3=[COUNT($3)])
-   +- FlinkLogicalExpand(projects=[{a1=[$0], b1=[$1], c1=[null], d1=[$3], 
$e=[1]}, {a1=[$0], b1=[null], c1=[$2], d1=[$3], $e=[2]}])
+   +- FlinkLogicalExpand(projects=[a1, b1, c1, d1, $e])
       +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, 
b1, c1, d1])
 ]]>
     </Resource>
@@ -479,7 +479,7 @@ LogicalProject(a1=[$0], s=[$2])
       <![CDATA[
 FlinkLogicalCalc(select=[a1, s])
 +- FlinkLogicalAggregate(group=[{0, 1, 3}], s=[SUM($2)])
-   +- FlinkLogicalExpand(projects=[{a1=[$0], c1=[$1], b1=[$2], $e=[0]}, 
{a1=[$0], c1=[null], b1=[$2], $e=[1]}, {a1=[null], c1=[null], b1=[$2], $e=[3]}])
+   +- FlinkLogicalExpand(projects=[a1, c1, b1, $e])
       +- FlinkLogicalCalc(select=[a1, c1, b1])
          +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, 
b1, c1, d1])
 ]]>
@@ -500,7 +500,7 @@ LogicalProject(a1=[$0], b1=[$1], c1=[$2], EXPR$3=[$4])
       <![CDATA[
 FlinkLogicalCalc(select=[a1, b1, c1, EXPR$3])
 +- FlinkLogicalAggregate(group=[{0, 2, 3, 4}], b1=[AUXILIARY_GROUP($1)], 
EXPR$3=[COUNT($5)])
-   +- FlinkLogicalExpand(projects=[{a1=[$0], b1=[$1], c1=[$2], d1=[null], 
$e=[1], d1_0=[$3]}, {a1=[$0], b1=[$1], c1=[null], d1=[$3], $e=[2], d1_0=[$3]}])
+   +- FlinkLogicalExpand(projects=[a1, b1, c1, d1, $e, d1_0])
       +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, 
b1, c1, d1])
 ]]>
     </Resource>
@@ -557,7 +557,7 @@ LogicalAggregate(group=[{0, 1, 2}], groups=[[{0, 1, 2}, {0, 
1}, {0}, {}]], EXPR$
       <![CDATA[
 FlinkLogicalCalc(select=[a1, b1, c1, EXPR$3])
 +- FlinkLogicalAggregate(group=[{0, 1, 2, 4}], EXPR$3=[COUNT($3)])
-   +- FlinkLogicalExpand(projects=[{a1=[$0], b1=[$1], c1=[$2], d1=[$3], 
$e=[0]}, {a1=[$0], b1=[$1], c1=[null], d1=[$3], $e=[1]}, {a1=[$0], b1=[null], 
c1=[null], d1=[$3], $e=[3]}, {a1=[null], b1=[null], c1=[null], d1=[$3], 
$e=[7]}])
+   +- FlinkLogicalExpand(projects=[a1, b1, c1, d1, $e])
       +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, 
b1, c1, d1])
 ]]>
     </Resource>
@@ -635,7 +635,7 @@ LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT(DISTINCT 
$2)], EXPR$3=[SUM(DISTIN
       <![CDATA[
 FlinkLogicalAggregate(group=[{0}], d1=[AUXILIARY_GROUP($3)], EXPR$2=[COUNT($2) 
FILTER $5], EXPR$3=[SUM($1) FILTER $4])
 +- FlinkLogicalCalc(select=[a1, b1, c1, d1, =(CASE(=($e, 2:BIGINT), 2:BIGINT, 
4:BIGINT), 2) AS $g_2, =(CASE(=($e, 2:BIGINT), 2:BIGINT, 4:BIGINT), 4) AS $g_4])
-   +- FlinkLogicalExpand(projects=[{a1=[$0], b1=[$1], c1=[null], d1=[$3], 
$e=[2]}, {a1=[$0], b1=[null], c1=[$2], d1=[$3], $e=[4]}])
+   +- FlinkLogicalExpand(projects=[a1, b1, c1, d1, $e])
       +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, 
b1, c1, d1])
 ]]>
     </Resource>
@@ -656,7 +656,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[SUM(DISTINCT $1)], 
EXPR$2=[MAX($1)], EXPR$
 FlinkLogicalAggregate(group=[{0}], EXPR$1=[SUM($1) FILTER $4], EXPR$2=[MIN($2) 
FILTER $5], EXPR$3=[MIN($3) FILTER $5])
 +- FlinkLogicalCalc(select=[a1, b1, EXPR$2, EXPR$3, =(CASE(=($e, 0:BIGINT), 
0:BIGINT, 1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 
1) AS $g_1])
    +- FlinkLogicalAggregate(group=[{0, 1, 3}], EXPR$2=[MAX($4)], 
EXPR$3=[MIN($2)])
-      +- FlinkLogicalExpand(projects=[{a1=[$0], b1=[$1], c1=[$2], $e=[0], 
b1_0=[$1]}, {a1=[$0], b1=[null], c1=[$2], $e=[1], b1_0=[$1]}])
+      +- FlinkLogicalExpand(projects=[a1, b1, c1, $e, b1_0])
          +- FlinkLogicalCalc(select=[a1, b1, c1])
             +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, 
b1, c1, d1])
 ]]>
@@ -678,7 +678,7 @@ LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT(DISTINCT 
$2)], EXPR$3=[MAX($3)],
 FlinkLogicalAggregate(group=[{0}], d1=[MIN($2) FILTER $6], EXPR$2=[COUNT($1) 
FILTER $5], EXPR$3=[MIN($3) FILTER $6], EXPR$4=[MIN($4) FILTER $6])
 +- FlinkLogicalCalc(select=[a1, c1, d1, EXPR$3, EXPR$4, =(CASE(=($e, 
0:BIGINT), 0:BIGINT, 1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 
1:BIGINT), 1) AS $g_1])
    +- FlinkLogicalAggregate(group=[{0, 2, 4}], d1=[AUXILIARY_GROUP($3)], 
EXPR$3=[MAX($1)], EXPR$4=[SUM($1)])
-      +- FlinkLogicalExpand(projects=[{a1=[$0], b1=[$1], c1=[$2], d1=[$3], 
$e=[0]}, {a1=[$0], b1=[$1], c1=[null], d1=[$3], $e=[1]}])
+      +- FlinkLogicalExpand(projects=[a1, b1, c1, d1, $e])
          +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, 
b1, c1, d1])
 ]]>
     </Resource>
@@ -698,8 +698,8 @@ LogicalProject(EXPR$0=[$2])
     <Resource name="planAfter">
       <![CDATA[
 FlinkLogicalCalc(select=[EXPR$0])
-+- FlinkLogicalAggregate(group=[{0}], EXPR$0=[COUNT($2)])
-   +- FlinkLogicalCalc(select=[1 AS $f0, true AS $f1, c1])
++- FlinkLogicalAggregate(group=[{0}], EXPR$0=[COUNT($1)])
+   +- FlinkLogicalCalc(select=[1 AS $f0, c1])
       +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, 
b1, c1, d1])
 ]]>
     </Resource>
@@ -774,8 +774,8 @@ LogicalProject(a1=[$0], b1=[$1], EXPR$2=[$4])
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-FlinkLogicalAggregate(group=[{0}], b1=[AUXILIARY_GROUP($1)], 
EXPR$2=[COUNT($4)])
-+- FlinkLogicalCalc(select=[a1, b1, 1 AS $f2, true AS $f3, c1])
+FlinkLogicalAggregate(group=[{0}], b1=[AUXILIARY_GROUP($1)], 
EXPR$2=[COUNT($2)])
++- FlinkLogicalCalc(select=[a1, b1, c1])
    +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, b1, c1, d1])
 ]]>
     </Resource>
@@ -836,7 +836,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], 
EXPR$2=[SUM($1)])
 FlinkLogicalAggregate(group=[{0}], EXPR$1=[COUNT($1) FILTER $3], 
EXPR$2=[MIN($2) FILTER $4])
 +- FlinkLogicalCalc(select=[a1, b1, EXPR$2, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 
1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 1) AS $g_1])
    +- FlinkLogicalAggregate(group=[{0, 1, 2}], EXPR$2=[SUM($3)])
-      +- FlinkLogicalExpand(projects=[{a1=[$0], b1=[$1], $e=[0], b1_0=[$1]}, 
{a1=[$0], b1=[null], $e=[1], b1_0=[$1]}])
+      +- FlinkLogicalExpand(projects=[a1, b1, $e, b1_0])
          +- FlinkLogicalCalc(select=[a1, b1])
             +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, 
b1, c1, d1])
 ]]>
@@ -858,7 +858,7 @@ LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT(DISTINCT 
$2)], EXPR$3=[SUM($2)])
 FlinkLogicalAggregate(group=[{0}], c1=[AUXILIARY_GROUP($1)], EXPR$2=[COUNT($2) 
FILTER $4], EXPR$3=[MIN($3) FILTER $5])
 +- FlinkLogicalCalc(select=[a1, c1, b1, EXPR$3, =(CASE(=($e, 0:BIGINT), 
0:BIGINT, 1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 
1) AS $g_1])
    +- FlinkLogicalAggregate(group=[{0, 2, 3}], c1=[AUXILIARY_GROUP($1)], 
EXPR$3=[SUM($4)])
-      +- FlinkLogicalExpand(projects=[{a1=[$0], c1=[$1], b1=[$2], $e=[0], 
b1_0=[$2]}, {a1=[$0], c1=[$1], b1=[null], $e=[1], b1_0=[$2]}])
+      +- FlinkLogicalExpand(projects=[a1, c1, b1, $e, b1_0])
          +- FlinkLogicalCalc(select=[a1, c1, b1])
             +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, 
b1, c1, d1])
 ]]>
@@ -880,7 +880,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[COUNT(DISTINCT $1)], 
EXPR$2=[SUM($2)])
 FlinkLogicalAggregate(group=[{0}], EXPR$1=[COUNT($1) FILTER $3], 
EXPR$2=[MIN($2) FILTER $4])
 +- FlinkLogicalCalc(select=[a1, c1, EXPR$2, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 
1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 1) AS $g_1])
    +- FlinkLogicalAggregate(group=[{0, 1, 3}], EXPR$2=[SUM($2)])
-      +- FlinkLogicalExpand(projects=[{a1=[$0], c1=[$1], b1=[$2], $e=[0]}, 
{a1=[$0], c1=[null], b1=[$2], $e=[1]}])
+      +- FlinkLogicalExpand(projects=[a1, c1, b1, $e])
          +- FlinkLogicalCalc(select=[a1, c1, b1])
             +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, 
b1, c1, d1])
 ]]>
@@ -902,7 +902,7 @@ LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT(DISTINCT 
$2)], EXPR$3=[SUM($3)])
 FlinkLogicalAggregate(group=[{0}], d1=[MIN($2) FILTER $5], EXPR$2=[COUNT($1) 
FILTER $4], EXPR$3=[MIN($3) FILTER $5])
 +- FlinkLogicalCalc(select=[a1, c1, d1, EXPR$3, =(CASE(=($e, 0:BIGINT), 
0:BIGINT, 1:BIGINT), 0) AS $g_0, =(CASE(=($e, 0:BIGINT), 0:BIGINT, 1:BIGINT), 
1) AS $g_1])
    +- FlinkLogicalAggregate(group=[{0, 2, 4}], d1=[AUXILIARY_GROUP($3)], 
EXPR$3=[SUM($1)])
-      +- FlinkLogicalExpand(projects=[{a1=[$0], b1=[$1], c1=[$2], d1=[$3], 
$e=[0]}, {a1=[$0], b1=[$1], c1=[null], d1=[$3], $e=[1]}])
+      +- FlinkLogicalExpand(projects=[a1, b1, c1, d1, $e])
          +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, T1, source: [TestTableSource(a1, b1, c1, d1)]]], fields=[a1, 
b1, c1, d1])
 ]]>
     </Resource>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
index c2a132a..02c6a23 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
@@ -30,7 +30,7 @@ GROUP BY long, TUMBLE(rowtime, INTERVAL '10' SECOND)
       <![CDATA[
 LogicalProject(long=[$0], cnt=[$2], rt=[TUMBLE_END($1)])
 +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)])
-   +- LogicalProject(long=[$0], $f1=[TUMBLE($3, 10000:INTERVAL SECOND)], 
str=[$2])
+   +- LogicalProject(long=[$0], $f1=[$TUMBLE($3, 10000:INTERVAL SECOND)], 
str=[$2])
       +- LogicalTableScan(table=[[default_catalog, default_database, T1]])
 ]]>
     </Resource>
@@ -88,7 +88,7 @@ GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(distinct$0 
count$0) AS EXPR$1
       <![CDATA[
 LogicalProject(b=[$0], EXPR$1=[$2], EXPR$2=[TUMBLE_START($1)], 
EXPR$3=[TUMBLE_END($1)])
 +- LogicalAggregate(group=[{0, 1}], EXPR$1=[COUNT($2)])
-   +- LogicalProject(b=[$1], $f1=[TUMBLE($4, 5000:INTERVAL SECOND)], a=[$0])
+   +- LogicalProject(b=[$1], $f1=[$TUMBLE($4, 5000:INTERVAL SECOND)], a=[$0])
       +- LogicalJoin(condition=[AND(=($0, $5), >=($4, -($9, 5000:INTERVAL 
SECOND)), <=($4, +($9, 10000:INTERVAL SECOND)))], joinType=[inner])
          :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
0:INTERVAL MILLISECOND)])
          :  +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
@@ -149,7 +149,7 @@ LogicalProject(b=[$0], EXPR$1=[COUNT($1) OVER (PARTITION BY 
$0 ORDER BY $2 NULLS
 +- LogicalJoin(condition=[AND(=($1, $4), >=($2, -($5, 5000:INTERVAL SECOND)), 
<=($2, +($5, 10000:INTERVAL SECOND)))], joinType=[inner])
    :- LogicalProject(b=[$0], a=[$2], rt=[TUMBLE_ROWTIME($1)])
    :  +- LogicalAggregate(group=[{0, 1}], a=[COUNT($2)])
-   :     +- LogicalProject(b=[$1], $f1=[TUMBLE($4, 5000:INTERVAL SECOND)], 
a=[$0])
+   :     +- LogicalProject(b=[$1], $f1=[$TUMBLE($4, 5000:INTERVAL SECOND)], 
a=[$0])
    :        +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
0:INTERVAL MILLISECOND)])
    :           +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
    +- LogicalProject(b=[$0], a=[$2], rt=[HOP_ROWTIME($1)])
@@ -251,7 +251,7 @@ 
LogicalSink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a
    +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
       +- LogicalProject($f0=[HOP(TUMBLE_ROWTIME($0), 12000:INTERVAL SECOND, 
4000:INTERVAL SECOND)], id1=[$1], text=[$2], $f3=[_UTF-16LE'*'])
          +- LogicalAggregate(group=[{0, 1}], text=[LISTAGG($2, $3)])
-            +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], 
id1=[$0], text=[$2], $f3=[_UTF-16LE'#'])
+            +- LogicalProject($f0=[$TUMBLE($1, 6000:INTERVAL SECOND)], 
id1=[$0], text=[$2], $f3=[_UTF-16LE'#'])
                +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 
300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
                   +- LogicalJoin(condition=[true], joinType=[inner])
                      :- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($1, 0:INTERVAL MILLISECOND)])
@@ -262,7 +262,7 @@ 
LogicalSink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a
 LogicalSink(name=[`default_catalog`.`default_database`.`appendSink2`], 
fields=[a, b])
 +- LogicalProject(id1=[$1], EXPR$1=[$2])
    +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
-      +- LogicalProject($f0=[TUMBLE($1, 9000:INTERVAL SECOND)], id1=[$0], 
text=[$2], $f3=[_UTF-16LE'-'])
+      +- LogicalProject($f0=[$TUMBLE($1, 9000:INTERVAL SECOND)], id1=[$0], 
text=[$2], $f3=[_UTF-16LE'-'])
          +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 
300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
             +- LogicalJoin(condition=[true], joinType=[inner])
                :- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($1, 
0:INTERVAL MILLISECOND)])
@@ -274,7 +274,7 @@ 
LogicalSink(name=[`default_catalog`.`default_database`.`appendSink3`], fields=[a
 +- LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
    +- LogicalProject(id1=[$1], text=[$2])
       +- LogicalAggregate(group=[{0, 1}], text=[LISTAGG($2, $3)])
-         +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], 
text=[$2], $f3=[_UTF-16LE'#'])
+         +- LogicalProject($f0=[$TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], 
text=[$2], $f3=[_UTF-16LE'#'])
             +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 
300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
                +- LogicalJoin(condition=[true], joinType=[inner])
                   :- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($1, 0:INTERVAL MILLISECOND)])
@@ -610,9 +610,9 @@ GlobalGroupAggregate(groupBy=[b], select=[b, COUNT(count$0) 
AS EXPR$1])
       <![CDATA[
 LogicalProject(b=[$0], EXPR$1=[$2])
 +- LogicalAggregate(group=[{0, 1}], EXPR$1=[SUM($2)])
-   +- LogicalProject(b=[$0], $f1=[TUMBLE(TUMBLE_ROWTIME($1), 5000:INTERVAL 
SECOND)], cnt=[$2])
+   +- LogicalProject(b=[$0], $f1=[$TUMBLE(TUMBLE_ROWTIME($1), 5000:INTERVAL 
SECOND)], cnt=[$2])
       +- LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)])
-         +- LogicalProject(b=[$1], $f1=[TUMBLE($4, 10000:INTERVAL SECOND)], 
a=[$0])
+         +- LogicalProject(b=[$1], $f1=[$TUMBLE($4, 10000:INTERVAL SECOND)], 
a=[$0])
             +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 
0:INTERVAL MILLISECOND)])
                +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
 ]]>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RelTimeIndicatorConverterTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RelTimeIndicatorConverterTest.xml
index 6c0fafe..39d6c48 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RelTimeIndicatorConverterTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/RelTimeIndicatorConverterTest.xml
@@ -139,9 +139,9 @@ SELECT TUMBLE_END(newrowtime, INTERVAL '30' SECOND), long, 
sum(`int`) FROM (
       <![CDATA[
 LogicalProject(EXPR$0=[TUMBLE_END($0)], long=[$1], EXPR$2=[$2])
 +- LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)])
-   +- LogicalProject($f0=[TUMBLE(TUMBLE_ROWTIME($0), 30000:INTERVAL SECOND)], 
long=[$1], int=[$2])
+   +- LogicalProject($f0=[$TUMBLE(TUMBLE_ROWTIME($0), 30000:INTERVAL SECOND)], 
long=[$1], int=[$2])
       +- LogicalAggregate(group=[{0, 1}], int=[SUM($2)])
-         +- LogicalProject($f0=[TUMBLE($0, 10000:INTERVAL SECOND)], long=[$1], 
int=[$2])
+         +- LogicalProject($f0=[$TUMBLE($0, 10000:INTERVAL SECOND)], 
long=[$1], int=[$2])
             +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
 ]]>
     </Resource>
@@ -229,7 +229,7 @@ GROUP BY long, TUMBLE(rowtime, INTERVAL '0.1' SECOND)
       <![CDATA[
 LogicalProject(EXPR$0=[$2], long=[$0])
 +- LogicalAggregate(group=[{0, 1}], EXPR$0=[MIN($2)])
-   +- LogicalProject(long=[$1], $f1=[TUMBLE($0, 100:INTERVAL SECOND)], 
rowtime=[$0])
+   +- LogicalProject(long=[$1], $f1=[$TUMBLE($0, 100:INTERVAL SECOND)], 
rowtime=[$0])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
 ]]>
     </Resource>
@@ -279,7 +279,7 @@ FROM MyTable1
       <![CDATA[
 LogicalProject(EXPR$0=[TUMBLE_END($0)], long=[$1], EXPR$2=[$2])
 +- LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)])
-   +- LogicalProject($f0=[TUMBLE($0, 10000:INTERVAL SECOND)], long=[$1], 
int=[$2])
+   +- LogicalProject($f0=[$TUMBLE($0, 10000:INTERVAL SECOND)], long=[$1], 
int=[$2])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
 ]]>
     </Resource>
@@ -305,7 +305,7 @@ HAVING QUARTER(TUMBLE_END(rowtime, INTERVAL '1' SECOND)) = 1
 LogicalProject(EXPR$0=[$2], long=[$0])
 +- LogicalFilter(condition=[=(EXTRACT(FLAG(QUARTER), TUMBLE_END($1)), 1)])
    +- LogicalAggregate(group=[{0, 1}], EXPR$0=[MIN($2)])
-      +- LogicalProject(long=[$1], $f1=[TUMBLE($0, 1000:INTERVAL SECOND)], 
rowtime=[$0])
+      +- LogicalProject(long=[$1], $f1=[$TUMBLE($0, 1000:INTERVAL SECOND)], 
rowtime=[$0])
          +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
 ]]>
     </Resource>
@@ -334,7 +334,7 @@ FROM MyTable1
       <![CDATA[
 LogicalProject(rowtime=[TUMBLE_END($1)], long=[$0], EXPR$2=[$2])
 +- LogicalAggregate(group=[{0, 1}], EXPR$2=[SUM($2)])
-   +- LogicalProject(long=[$1], $f1=[TUMBLE($0, 100:INTERVAL SECOND)], 
int=[$2])
+   +- LogicalProject(long=[$1], $f1=[$TUMBLE($0, 100:INTERVAL SECOND)], 
int=[$2])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable1]])
 ]]>
     </Resource>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
index 837fa08..209eab3 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableSourceTest.xml
@@ -117,7 +117,7 @@ FROM rowTimeT WHERE val > 100
       <![CDATA[
 LogicalProject(name=[$0], EXPR$1=[TUMBLE_END($1)], EXPR$2=[$2])
 +- LogicalAggregate(group=[{0, 1}], EXPR$2=[AVG($2)])
-   +- LogicalProject(name=[$3], $f1=[TUMBLE($1, 600000:INTERVAL MINUTE)], 
val=[$2])
+   +- LogicalProject(name=[$3], $f1=[$TUMBLE($1, 600000:INTERVAL MINUTE)], 
val=[$2])
       +- LogicalFilter(condition=[>($2, 100)])
          +- LogicalTableScan(table=[[default_catalog, default_database, 
rowTimeT]])
 ]]>
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
index db86af3..13553e4 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml
@@ -34,7 +34,7 @@ FROM MyTable
       <![CDATA[
 LogicalProject(EXPR$0=[$1], EXPR$1=[$2], EXPR$2=[$3], EXPR$3=[$4], 
EXPR$4=[TUMBLE_START($0)], EXPR$5=[TUMBLE_END($0)])
 +- LogicalAggregate(group=[{0}], EXPR$0=[VAR_POP($1)], EXPR$1=[VAR_SAMP($1)], 
EXPR$2=[STDDEV_POP($1)], EXPR$3=[STDDEV_SAMP($1)])
-   +- LogicalProject($f0=[TUMBLE($4, 900000:INTERVAL MINUTE)], c=[$2])
+   +- LogicalProject($f0=[$TUMBLE($4, 900000:INTERVAL MINUTE)], c=[$2])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
@@ -48,53 +48,6 @@ Calc(select=[/(-($f0, /(*($f1, $f1), $f2)), $f2) AS EXPR$0, 
/(-($f0, /(*($f1, $f
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testWindowAggregateWithDifferentWindows">
-    <Resource name="sql">
-      <![CDATA[
-WITH window_1h AS (
-    SELECT 1
-    FROM MyTable
-    GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
-),
-
-window_2h AS (
-    SELECT 1
-    FROM MyTable
-    GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
-)
-
-(SELECT * FROM window_1h)
-UNION ALL
-(SELECT * FROM window_2h)
-]]>
-    </Resource>
-    <Resource name="planBefore">
-      <![CDATA[
-LogicalUnion(all=[true])
-:- LogicalProject(EXPR$0=[1])
-:  +- LogicalAggregate(group=[{0}])
-:     +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 3600000:INTERVAL 
HOUR)])
-:        +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
-+- LogicalProject(EXPR$0=[1])
-   +- LogicalAggregate(group=[{0}])
-      +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 7200000:INTERVAL 
HOUR)])
-         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
-]]>
-    </Resource>
-    <Resource name="planAfter">
-      <![CDATA[
-Union(all=[true], union=[EXPR$0])
-:- Calc(select=[1 AS EXPR$0])
-:  +- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 3600000, 
3600000)], select=[])
-:     +- Exchange(distribution=[single], reuse_id=[1])
-:        +- Calc(select=[rowtime])
-:           +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
-+- Calc(select=[1 AS EXPR$0])
-   +- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 7200000, 
3600000)], select=[])
-      +- Reused(reference_id=[1])
-]]>
-    </Resource>
-  </TestCase>
   <TestCase name="testExpressionOnWindowAuxFunction">
     <Resource name="sql">
       <![CDATA[
@@ -108,7 +61,7 @@ FROM MyTable
       <![CDATA[
 LogicalProject(EXPR$0=[$1], EXPR$1=[+(TUMBLE_END($0), 60000:INTERVAL MINUTE)])
 +- LogicalAggregate(group=[{0}], EXPR$0=[COUNT()])
-   +- LogicalProject($f0=[TUMBLE($4, 900000:INTERVAL MINUTE)])
+   +- LogicalProject($f0=[$TUMBLE($4, 900000:INTERVAL MINUTE)])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
@@ -213,7 +166,7 @@ GroupWindowAggregate(window=[SlidingGroupWindow('w$, $f2, 
60000, 1000)], select=
       <![CDATA[
 LogicalProject(EXPR$0=[$1])
 +- LogicalAggregate(group=[{0}], EXPR$0=[COUNT()])
-   +- LogicalProject($f0=[TUMBLE($3, 3024000000:INTERVAL DAY)])
+   +- LogicalProject($f0=[$TUMBLE($3, 3024000000:INTERVAL DAY)])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
@@ -243,7 +196,7 @@ LogicalProject(EXPR$0=[$1])
 +- LogicalAggregate(group=[{0}], EXPR$0=[weightedAvg($1, $2)])
    +- LogicalProject(b=[$1], c=[$2], a=[$0])
       +- LogicalAggregate(group=[{0, 1, 2, 3}])
-         +- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[TUMBLE($4, 
900000:INTERVAL MINUTE)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[$TUMBLE($4, 
900000:INTERVAL MINUTE)])
             +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
 ]]>
     </Resource>
@@ -380,9 +333,9 @@ GROUP BY TUMBLE(zzzzz, INTERVAL '0.004' SECOND)
       <![CDATA[
 LogicalProject(EXPR$0=[TUMBLE_ROWTIME($0)], EXPR$1=[TUMBLE_END($0)], a=[$1])
 +- LogicalAggregate(group=[{0}], a=[COUNT()])
-   +- LogicalProject($f0=[TUMBLE(TUMBLE_ROWTIME($0), 4:INTERVAL SECOND)], 
a=[$1])
+   +- LogicalProject($f0=[$TUMBLE(TUMBLE_ROWTIME($0), 4:INTERVAL SECOND)], 
a=[$1])
       +- LogicalAggregate(group=[{0}], a=[COUNT($1)])
-         +- LogicalProject($f0=[TUMBLE($4, 2:INTERVAL SECOND)], a=[$0])
+         +- LogicalProject($f0=[$TUMBLE($4, 2:INTERVAL SECOND)], a=[$0])
             +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
 ]]>
     </Resource>
@@ -416,7 +369,7 @@ LogicalProject(EXPR$0=[$2])
 +- LogicalAggregate(group=[{0, 1}], EXPR$0=[weightedAvg($2, $3)])
    +- LogicalProject(b=[$1], d=[$4], c=[$2], a=[$0])
       +- LogicalAggregate(group=[{0, 1, 2, 3}], d=[COUNT()])
-         +- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[TUMBLE($4, 
900000:INTERVAL MINUTE)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[$TUMBLE($4, 
900000:INTERVAL MINUTE)])
             +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
 ]]>
     </Resource>
@@ -453,7 +406,7 @@ GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)
       <![CDATA[
 LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
 +- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)])
-   +- LogicalProject($f0=[TUMBLE($4, 900000:INTERVAL MINUTE)], 
correct=[CASE(=($0, 1), 1, 99)])
+   +- LogicalProject($f0=[$TUMBLE($4, 900000:INTERVAL MINUTE)], 
correct=[CASE(=($0, 1), 1, 99)])
       +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
@@ -520,37 +473,32 @@ GroupWindowAggregate(window=[SessionGroupWindow('w$, $f2, 
60000)], select=[SUM(a
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testTumbleFunAndRegularAggFunInGroupBy">
+  <TestCase name="testTumbleFunction">
     <Resource name="sql">
       <![CDATA[
-SELECT weightedAvg(c, a) FROM
-    (SELECT a, b, c, count(*) d,
-        TUMBLE_START(rowtime, INTERVAL '15' MINUTE) as ping_start
-     FROM MyTable
-         GROUP BY a, b, c, TUMBLE(rowtime, INTERVAL '15' MINUTE)) AS t1
-GROUP BY b, d, ping_start
+SELECT COUNT(*),
+    weightedAvg(c, a) AS wAvg,
+    TUMBLE_START(rowtime, INTERVAL '15' MINUTE),
+    TUMBLE_END(rowtime, INTERVAL '15' MINUTE)
+FROM MyTable
+    GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)
       ]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(EXPR$0=[$3])
-+- LogicalAggregate(group=[{0, 1, 2}], EXPR$0=[weightedAvg($3, $4)])
-   +- LogicalProject(b=[$1], d=[$4], ping_start=[TUMBLE_START($3)], c=[$2], 
a=[$0])
-      +- LogicalAggregate(group=[{0, 1, 2, 3}], d=[COUNT()])
-         +- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[TUMBLE($4, 
900000:INTERVAL MINUTE)])
-            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
+LogicalProject(EXPR$0=[$1], wAvg=[$2], EXPR$2=[TUMBLE_START($0)], 
EXPR$3=[TUMBLE_END($0)])
++- LogicalAggregate(group=[{0}], EXPR$0=[COUNT()], wAvg=[weightedAvg($1, $2)])
+   +- LogicalProject($f0=[$TUMBLE($4, 900000:INTERVAL MINUTE)], c=[$2], a=[$0])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[EXPR$0])
-+- GroupAggregate(groupBy=[b, d, ping_start], select=[b, d, ping_start, 
weightedAvg(c, a) AS EXPR$0])
-   +- Exchange(distribution=[hash[b, d, ping_start]])
-      +- Calc(select=[b, d, w$start AS ping_start, c, a])
-         +- GroupWindowAggregate(groupBy=[a, b, c], 
window=[TumblingGroupWindow('w$, rowtime, 900000)], properties=[w$start, w$end, 
w$rowtime, w$proctime], select=[a, b, c, COUNT(*) AS d, start('w$) AS w$start, 
end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
-            +- Exchange(distribution=[hash[a, b, c]])
-               +- Calc(select=[a, b, c, rowtime])
-                  +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
+Calc(select=[EXPR$0, wAvg, w$start AS EXPR$2, w$end AS EXPR$3])
++- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 900000)], 
properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS EXPR$0, 
weightedAvg(c, a) AS wAvg, start('w$) AS w$start, end('w$) AS w$end, 
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+   +- Exchange(distribution=[single])
+      +- Calc(select=[rowtime, c, a])
+         +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
@@ -572,7 +520,7 @@ LogicalProject(EXPR$0=[$2])
 +- LogicalAggregate(group=[{0, 1}], EXPR$0=[weightedAvg($2, $3)])
    +- LogicalProject(b=[$1], ping_start=[TUMBLE_START($3)], c=[$2], a=[$0])
       +- LogicalAggregate(group=[{0, 1, 2, 3}])
-         +- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[TUMBLE($4, 
900000:INTERVAL MINUTE)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[$TUMBLE($4, 
900000:INTERVAL MINUTE)])
             +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
 ]]>
     </Resource>
@@ -589,94 +537,104 @@ Calc(select=[EXPR$0])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testTumbleFunction">
+  <TestCase name="testTumblingWindowWithProctime">
     <Resource name="sql">
-      <![CDATA[
-SELECT COUNT(*),
-    weightedAvg(c, a) AS wAvg,
-    TUMBLE_START(rowtime, INTERVAL '15' MINUTE),
-    TUMBLE_END(rowtime, INTERVAL '15' MINUTE)
-FROM MyTable
-    GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)
-      ]]>
+      <![CDATA[select sum(a), max(b) from MyTable1 group by TUMBLE(c, INTERVAL 
'1' SECOND)]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(EXPR$0=[$1], wAvg=[$2], EXPR$2=[TUMBLE_START($0)], 
EXPR$3=[TUMBLE_END($0)])
-+- LogicalAggregate(group=[{0}], EXPR$0=[COUNT()], wAvg=[weightedAvg($1, $2)])
-   +- LogicalProject($f0=[TUMBLE($4, 900000:INTERVAL MINUTE)], c=[$2], a=[$0])
-      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+LogicalProject(EXPR$0=[$1], EXPR$1=[$2])
++- LogicalAggregate(group=[{0}], EXPR$0=[SUM($1)], EXPR$1=[MAX($2)])
+   +- LogicalProject($f0=[$TUMBLE(PROCTIME(), 1000:INTERVAL SECOND)], a=[$0], 
b=[$1])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [CollectionTableSource(a, b)]]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-Calc(select=[EXPR$0, wAvg, w$start AS EXPR$2, w$end AS EXPR$3])
-+- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rowtime, 900000)], 
properties=[w$start, w$end, w$rowtime, w$proctime], select=[COUNT(*) AS EXPR$0, 
weightedAvg(c, a) AS wAvg, start('w$) AS w$start, end('w$) AS w$end, 
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
-   +- Exchange(distribution=[single])
-      +- Calc(select=[rowtime, c, a])
-         +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
+GroupWindowAggregate(window=[TumblingGroupWindow('w$, $f2, 1000)], 
select=[SUM(a) AS EXPR$0, MAX(b) AS EXPR$1])
++- Exchange(distribution=[single])
+   +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, 
source: [CollectionTableSource(a, b)]]], fields=[a, b])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testTumblingWindowWithProctime">
+  <TestCase name="testTumbleFunAndRegularAggFunInGroupBy">
     <Resource name="sql">
-      <![CDATA[select sum(a), max(b) from MyTable1 group by TUMBLE(c, INTERVAL 
'1' SECOND)]]>
+      <![CDATA[
+SELECT weightedAvg(c, a) FROM
+    (SELECT a, b, c, count(*) d,
+        TUMBLE_START(rowtime, INTERVAL '15' MINUTE) as ping_start
+     FROM MyTable
+         GROUP BY a, b, c, TUMBLE(rowtime, INTERVAL '15' MINUTE)) AS t1
+GROUP BY b, d, ping_start
+      ]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(EXPR$0=[$1], EXPR$1=[$2])
-+- LogicalAggregate(group=[{0}], EXPR$0=[SUM($1)], EXPR$1=[MAX($2)])
-   +- LogicalProject($f0=[TUMBLE(PROCTIME(), 1000:INTERVAL SECOND)], a=[$0], 
b=[$1])
-      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1, 
source: [CollectionTableSource(a, b)]]])
+LogicalProject(EXPR$0=[$3])
++- LogicalAggregate(group=[{0, 1, 2}], EXPR$0=[weightedAvg($3, $4)])
+   +- LogicalProject(b=[$1], d=[$4], ping_start=[TUMBLE_START($3)], c=[$2], 
a=[$0])
+      +- LogicalAggregate(group=[{0, 1, 2, 3}], d=[COUNT()])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], $f3=[$TUMBLE($4, 
900000:INTERVAL MINUTE)])
+            +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
-GroupWindowAggregate(window=[TumblingGroupWindow('w$, $f2, 1000)], 
select=[SUM(a) AS EXPR$0, MAX(b) AS EXPR$1])
-+- Exchange(distribution=[single])
-   +- TableSourceScan(table=[[default_catalog, default_database, MyTable1, 
source: [CollectionTableSource(a, b)]]], fields=[a, b])
+Calc(select=[EXPR$0])
++- GroupAggregate(groupBy=[b, d, ping_start], select=[b, d, ping_start, 
weightedAvg(c, a) AS EXPR$0])
+   +- Exchange(distribution=[hash[b, d, ping_start]])
+      +- Calc(select=[b, d, w$start AS ping_start, c, a])
+         +- GroupWindowAggregate(groupBy=[a, b, c], 
window=[TumblingGroupWindow('w$, rowtime, 900000)], properties=[w$start, w$end, 
w$rowtime, w$proctime], select=[a, b, c, COUNT(*) AS d, start('w$) AS w$start, 
end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
+            +- Exchange(distribution=[hash[a, b, c]])
+               +- Calc(select=[a, b, c, rowtime])
+                  +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
 ]]>
     </Resource>
   </TestCase>
   <TestCase name="testWindowAggregateWithDifferentWindows">
     <Resource name="sql">
       <![CDATA[
-SELECT
-  SUM(correct) AS s,
-  AVG(correct) AS a,
-  TUMBLE_START(rowtime, INTERVAL '15' MINUTE) AS wStart
-FROM (
-  SELECT CASE a
-      WHEN 1 THEN 1
-      ELSE 99
-    END AS correct, rowtime
-  FROM MyTable
+WITH window_1h AS (
+    SELECT 1
+    FROM MyTable
+    GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '1' HOUR)
+),
+
+window_2h AS (
+    SELECT 1
+    FROM MyTable
+    GROUP BY HOP(`rowtime`, INTERVAL '1' HOUR, INTERVAL '2' HOUR)
 )
-GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE)
-      ]]>
+
+(SELECT * FROM window_1h)
+UNION ALL
+(SELECT * FROM window_2h)
+]]>
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(s=[$1], a=[$2], wStart=[TUMBLE_START($0)])
-+- LogicalAggregate(group=[{0}], s=[SUM($1)], a=[AVG($1)])
-   +- LogicalProject($f0=[TUMBLE($4, 900000:INTERVAL MINUTE)], 
correct=[CASE(=($0, 1), 1, 99)])
-      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+LogicalUnion(all=[true])
+:- LogicalProject(EXPR$0=[1])
+:  +- LogicalAggregate(group=[{0}])
+:     +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 3600000:INTERVAL 
HOUR)])
+:        +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
++- LogicalProject(EXPR$0=[1])
+   +- LogicalAggregate(group=[{0}])
+      +- LogicalProject($f0=[HOP($4, 3600000:INTERVAL HOUR, 7200000:INTERVAL 
HOUR)])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable]])
 ]]>
     </Resource>
     <Resource name="planAfter">
       <![CDATA[
 Union(all=[true], union=[EXPR$0])
 :- Calc(select=[1 AS EXPR$0])
-:  +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 3600000, 
3600000)], select=[])
-:     +- Exchange(distribution=[single])
-:        +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 
3600000, 3600000)], select=[])
-:           +- Calc(select=[ts], reuse_id=[1])
-:              +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable2, source: [TestTableSource(a, b, c, d, ts)]]], fields=[a, b, c, d, ts])
+:  +- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 3600000, 
3600000)], select=[])
+:     +- Exchange(distribution=[single], reuse_id=[1])
+:        +- Calc(select=[rowtime])
+:           +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
 +- Calc(select=[1 AS EXPR$0])
-   +- HashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 7200000, 
3600000)], select=[])
-      +- Exchange(distribution=[single])
-         +- LocalHashWindowAggregate(window=[SlidingGroupWindow('w$, ts, 
7200000, 3600000)], select=[])
-            +- Reused(reference_id=[1])
+   +- GroupWindowAggregate(window=[SlidingGroupWindow('w$, rowtime, 7200000, 
3600000)], select=[])
+      +- Reused(reference_id=[1])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/BasicOperatorTable.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/BasicOperatorTable.scala
index 11d9a4b..5ec1fc7 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/BasicOperatorTable.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/catalog/BasicOperatorTable.scala
@@ -260,6 +260,8 @@ object BasicOperatorTable {
     */
 
   val TUMBLE: SqlGroupedWindowFunction = new SqlGroupedWindowFunction(
+    // The TUMBLE group function was hard code to $TUMBLE in CALCITE-3382.
+    "$TUMBLE",
     SqlKind.TUMBLE,
     null,
     OperandTypes.or(OperandTypes.DATETIME_INTERVAL, 
OperandTypes.DATETIME_INTERVAL_TIME)) {

Reply via email to