beyond1920 commented on a change in pull request #15195:
URL: https://github.com/apache/flink/pull/15195#discussion_r594034270



##########
File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml
##########
@@ -0,0 +1,1041 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testCantMergeWindowTVF_Cumulate">
+    <Resource name="sql">
+      <![CDATA[
+SELECT L.a, L.b, L.c, R.a, R.b, R.c
+FROM (
+  SELECT *
+  FROM TABLE(
+  CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL 
'1' HOUR))
+) L
+JOIN (
+  SELECT *
+  FROM TABLE(
+  CUMULATE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL 
'1' HOUR))
+) R
+ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = 
R.a
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$8], b0=[$9], c0=[$10])
++- LogicalJoin(condition=[AND(=($5, $13), =($6, $14), =($0, $8))], 
joinType=[inner])
+   :- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], 
window_start=[$5], window_end=[$6], window_time=[$7])
+   :  +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($3), 
600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, 
VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
+   :     +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4])
+   :        +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+   :           +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :              +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], 
window_start=[$5], window_end=[$6], window_time=[$7])
+      +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($3), 
600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, 
VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4])
+            +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+                  +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, c, a0, b0, c0])
++- WindowJoin(leftWindow=[CUMULATE(win_start=[window_start], 
win_end=[window_end], max_size=[1 h], step=[10 min])], 
rightWindow=[CUMULATE(win_start=[window_start], win_end=[window_end], 
max_size=[1 h], step=[10 min])], joinType=[InnerJoin], where=[=(a, a0)], 
select=[a, b, c, window_start, window_end, a0, b0, c0, window_start0, 
window_end0])
+   :- Exchange(distribution=[hash[a]])
+   :  +- Calc(select=[a, b, c, window_start, window_end])
+   :     +- WindowTableFunction(window=[CUMULATE(time_col=[rowtime], 
max_size=[1 h], step=[10 min])])
+   :        +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+   :           +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+   :              +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, b, c, window_start, window_end])
+         +- WindowTableFunction(window=[CUMULATE(time_col=[rowtime], 
max_size=[1 h], step=[10 min])])
+            +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+               +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCantMergeWindowTVF_CumulateOnProctime">
+    <Resource name="sql">
+      <![CDATA[
+SELECT L.a, L.b, L.c, R.a, R.b, R.c
+FROM (
+  SELECT *
+  FROM TABLE(
+  CUMULATE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL 
'1' HOUR))
+) L
+JOIN (
+  SELECT *
+  FROM TABLE(
+  CUMULATE(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, 
INTERVAL '1' HOUR))
+) R
+ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = 
R.a
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$8], b0=[$9], c0=[$10])
++- LogicalJoin(condition=[AND(=($5, $13), =($6, $14), =($0, $8))], 
joinType=[inner])
+   :- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], 
window_start=[$5], window_end=[$6], window_time=[$7])
+   :  +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($4), 
600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, 
VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(PROCTIME) window_time)])
+   :     +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4])
+   :        +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+   :           +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :              +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], 
window_start=[$5], window_end=[$6], window_time=[$7])
+      +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($4), 
600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, 
VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(PROCTIME) window_time)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4])
+            +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+                  +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, c, a0, b0, c0])
++- WindowJoin(leftWindow=[CUMULATE(win_start=[window_start], 
win_end=[window_end], max_size=[1 h], step=[10 min])], 
rightWindow=[CUMULATE(win_start=[window_start], win_end=[window_end], 
max_size=[1 h], step=[10 min])], joinType=[InnerJoin], where=[=(a, a0)], 
select=[a, b, c, window_start, window_end, a0, b0, c0, window_start0, 
window_end0])
+   :- Exchange(distribution=[hash[a]])
+   :  +- Calc(select=[a, b, c, window_start, window_end])
+   :     +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], 
max_size=[1 h], step=[10 min])])
+   :        +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+   :           +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+   :              +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, b, c, window_start, window_end])
+         +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], 
max_size=[1 h], step=[10 min])])
+            +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+               +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCantMergeWindowTVF_Hop">
+    <Resource name="sql">
+      <![CDATA[
+SELECT L.a, L.b, L.c, R.a, R.b, R.c
+FROM (
+  SELECT *
+  FROM TABLE(
+  HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' 
MINUTE))
+) L
+JOIN (
+  SELECT *
+  FROM TABLE(
+  HOP(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' 
MINUTE))
+) R
+ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = 
R.a
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$8], b0=[$9], c0=[$10])
++- LogicalJoin(condition=[AND(=($5, $13), =($6, $14), =($0, $8))], 
joinType=[inner])
+   :- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], 
window_start=[$5], window_end=[$6], window_time=[$7])
+   :  +- LogicalTableFunctionScan(invocation=[HOP($4, DESCRIPTOR($3), 
300000:INTERVAL MINUTE, 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER 
a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
+   :     +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4])
+   :        +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+   :           +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :              +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], 
window_start=[$5], window_end=[$6], window_time=[$7])
+      +- LogicalTableFunctionScan(invocation=[HOP($4, DESCRIPTOR($3), 
300000:INTERVAL MINUTE, 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER 
a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4])
+            +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+                  +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, c, a0, b0, c0])
++- WindowJoin(leftWindow=[HOP(win_start=[window_start], win_end=[window_end], 
size=[10 min], slide=[5 min])], rightWindow=[HOP(win_start=[window_start], 
win_end=[window_end], size=[10 min], slide=[5 min])], joinType=[InnerJoin], 
where=[=(a, a0)], select=[a, b, c, window_start, window_end, a0, b0, c0, 
window_start0, window_end0])
+   :- Exchange(distribution=[hash[a]])
+   :  +- Calc(select=[a, b, c, window_start, window_end])
+   :     +- WindowTableFunction(window=[HOP(time_col=[rowtime], size=[10 min], 
slide=[5 min])])
+   :        +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+   :           +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+   :              +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, b, c, window_start, window_end])
+         +- WindowTableFunction(window=[HOP(time_col=[rowtime], size=[10 min], 
slide=[5 min])])
+            +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+               +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCantMergeWindowTVF_HopOnProctime">
+    <Resource name="sql">
+      <![CDATA[
+SELECT L.a, L.b, L.c, R.a, R.b, R.c
+FROM (
+  SELECT *
+  FROM TABLE(
+  HOP(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE, INTERVAL '10' 
MINUTE))
+) L
+JOIN (
+  SELECT *
+  FROM TABLE(
+  HOP(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '5' MINUTE, INTERVAL '10' 
MINUTE))
+) R
+ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = 
R.a
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$8], b0=[$9], c0=[$10])
++- LogicalJoin(condition=[AND(=($5, $13), =($6, $14), =($0, $8))], 
joinType=[inner])
+   :- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], 
window_start=[$5], window_end=[$6], window_time=[$7])
+   :  +- LogicalTableFunctionScan(invocation=[HOP($4, DESCRIPTOR($4), 
300000:INTERVAL MINUTE, 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER 
a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(PROCTIME) window_time)])
+   :     +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4])
+   :        +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+   :           +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :              +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], 
window_start=[$5], window_end=[$6], window_time=[$7])
+      +- LogicalTableFunctionScan(invocation=[HOP($4, DESCRIPTOR($4), 
300000:INTERVAL MINUTE, 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER 
a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(PROCTIME) window_time)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4])
+            +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+                  +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, c, a0, b0, c0])
++- WindowJoin(leftWindow=[HOP(win_start=[window_start], win_end=[window_end], 
size=[10 min], slide=[5 min])], rightWindow=[HOP(win_start=[window_start], 
win_end=[window_end], size=[10 min], slide=[5 min])], joinType=[InnerJoin], 
where=[=(a, a0)], select=[a, b, c, window_start, window_end, a0, b0, c0, 
window_start0, window_end0])
+   :- Exchange(distribution=[hash[a]])
+   :  +- Calc(select=[a, b, c, window_start, window_end])
+   :     +- WindowTableFunction(window=[HOP(time_col=[proctime], size=[10 
min], slide=[5 min])])
+   :        +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+   :           +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+   :              +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, b, c, window_start, window_end])
+         +- WindowTableFunction(window=[HOP(time_col=[proctime], size=[10 
min], slide=[5 min])])
+            +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+               +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCantMergeWindowTVF_Tumble">
+    <Resource name="sql">
+      <![CDATA[
+SELECT L.a, L.b, L.c, R.a, R.b, R.c
+FROM (
+  SELECT *
+  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
+) L
+JOIN (
+  SELECT *
+  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
+) R
+ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = 
R.a
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$8], b0=[$9], c0=[$10])
++- LogicalJoin(condition=[AND(=($5, $13), =($6, $14), =($0, $8))], 
joinType=[inner])
+   :- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], 
window_start=[$5], window_end=[$6], window_time=[$7])
+   :  +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($3), 
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, 
BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) 
window_time)])
+   :     +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4])
+   :        +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+   :           +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :              +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], 
window_start=[$5], window_end=[$6], window_time=[$7])
+      +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($3), 
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, 
BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) 
window_time)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4])
+            +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+                  +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, c, a0, b0, c0])
++- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[15 min])], 
rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 
min])], joinType=[InnerJoin], where=[=(a, a0)], select=[a, b, c, window_start, 
window_end, a0, b0, c0, window_start0, window_end0])
+   :- Exchange(distribution=[hash[a]])
+   :  +- Calc(select=[a, b, c, window_start, window_end])
+   :     +- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 
min])])
+   :        +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+   :           +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+   :              +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, b, c, window_start, window_end])
+         +- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 
min])])
+            +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+               +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCantMergeWindowTVF_TumbleOnProctime">
+    <Resource name="sql">
+      <![CDATA[
+SELECT L.a, L.b, L.c, R.a, R.b, R.c
+FROM (
+  SELECT *
+  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE))
+) L
+JOIN (
+  SELECT *
+  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '15' 
MINUTE))
+) R
+ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = 
R.a
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$8], b0=[$9], c0=[$10])
++- LogicalJoin(condition=[AND(=($5, $13), =($6, $14), =($0, $8))], 
joinType=[inner])
+   :- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], 
window_start=[$5], window_end=[$6], window_time=[$7])
+   :  +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($4), 
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, 
BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(PROCTIME) 
window_time)])
+   :     +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4])
+   :        +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+   :           +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :              +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], 
window_start=[$5], window_end=[$6], window_time=[$7])
+      +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($4), 
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, 
BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(PROCTIME) 
window_time)])
+         +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4])
+            +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+               +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+                  +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, b, c, a0, b0, c0])
++- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[15 min])], 
rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 
min])], joinType=[InnerJoin], where=[=(a, a0)], select=[a, b, c, window_start, 
window_end, a0, b0, c0, window_start0, window_end0])
+   :- Exchange(distribution=[hash[a]])
+   :  +- Calc(select=[a, b, c, window_start, window_end])
+   :     +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[15 
min])])
+   :        +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+   :           +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+   :              +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, b, c, window_start, window_end])
+         +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[15 
min])])
+            +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+               +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testOnCumulateWindowAggregate">
+    <Resource name="sql">
+      <![CDATA[
+SELECT L.*, R.*
+FROM (
+  SELECT
+    a,
+    window_start,
+    window_end,
+    window_time,
+    count(*) as cnt,
+    count(distinct c) AS uv
+  FROM TABLE(
+    CUMULATE(
+      TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' 
HOUR))
+  GROUP BY a, window_start, window_end, window_time
+) L
+JOIN (
+  SELECT
+    a,
+    window_start,
+    window_end,
+    window_time,
+    count(*) as cnt,
+    count(distinct c) AS uv
+  FROM TABLE(
+    CUMULATE(
+      TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' 
HOUR))
+  GROUP BY a, window_start, window_end, window_time
+) R
+ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = 
R.a
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], window_start=[$1], window_end=[$2], window_time=[$3], 
cnt=[$4], uv=[$5], a0=[$6], window_start0=[$7], window_end0=[$8], 
window_time0=[$9], cnt0=[$10], uv0=[$11])
++- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6))], 
joinType=[inner])
+   :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
+   :  +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+   :     +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($3), 
600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, 
VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
+   :        +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+   :           +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+   :              +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :                 +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
+      +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+         +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($3), 
600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, 
VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+                     +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+WindowJoin(leftWindow=[CUMULATE(win_start=[window_start], 
win_end=[window_end], max_size=[1 h], step=[10 min])], 
rightWindow=[CUMULATE(win_start=[window_start], win_end=[window_end], 
max_size=[1 h], step=[10 min])], joinType=[InnerJoin], where=[=(a, a0)], 
select=[a, window_start, window_end, window_time, cnt, uv, a0, window_start0, 
window_end0, window_time0, cnt0, uv0])
+:- Exchange(distribution=[hash[a]])
+:  +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+:     +- WindowAggregate(groupBy=[a], window=[CUMULATE(time_col=[rowtime], 
max_size=[1 h], step=[10 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) 
AS uv, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS 
window_time])
+:        +- Exchange(distribution=[hash[a]])
+:           +- Calc(select=[a, c, rowtime])
+:              +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+:                 +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+:                    +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
++- Exchange(distribution=[hash[a]])
+   +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+      +- WindowAggregate(groupBy=[a], window=[CUMULATE(time_col=[rowtime], 
max_size=[1 h], step=[10 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) 
AS uv, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS 
window_time])
+         +- Exchange(distribution=[hash[a]])
+            +- Calc(select=[a, c, rowtime])
+               +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+                  +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTimeAttributePropagateForWindowJoin1">
+    <Resource name="sql">
+      <![CDATA[
+SELECT tmp1.*, MyTable4.* FROM tmp1 JOIN MyTable4 ON
+ tmp1.a = MyTable4.a AND
+ tmp1.rowtime BETWEEN
+   MyTable4.rowtime - INTERVAL '10' SECOND AND
+   MyTable4.rowtime + INTERVAL '1' HOUR
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(rowtime=[$0], a=[$1], l_cnt=[$2], l_uv=[$3], r_cnt=[$4], 
r_uv=[$5], a0=[$6], b=[$7], c=[$8], rowtime0=[$9], proctime=[$10])
++- LogicalJoin(condition=[AND(=($1, $6), >=($0, -($9, 10000:INTERVAL SECOND)), 
<=($0, +($9, 3600000:INTERVAL HOUR)))], joinType=[inner])
+   :- LogicalProject(rowtime=[$3], a=[$0], l_cnt=[$4], l_uv=[$5], r_cnt=[$10], 
r_uv=[$11])
+   :  +- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6))], 
joinType=[inner])
+   :     :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], 
uv=[COUNT(DISTINCT $4)])
+   :     :  +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+   :     :     +- LogicalTableFunctionScan(invocation=[CUMULATE($4, 
DESCRIPTOR($3), 600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], 
rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME 
ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) 
window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
+   :     :        +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+   :     :           +- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($3, 1000:INTERVAL SECOND)])
+   :     :              +- LogicalProject(a=[$0], b=[$1], c=[$2], 
rowtime=[$3], proctime=[PROCTIME()])
+   :     :                 +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   :     +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], 
uv=[COUNT(DISTINCT $4)])
+   :        +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+   :           +- LogicalTableFunctionScan(invocation=[CUMULATE($4, 
DESCRIPTOR($3), 600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], 
rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME 
ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) 
window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
+   :              +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+   :                 +- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($3, 1000:INTERVAL SECOND)])
+   :                    +- LogicalProject(a=[$0], b=[$1], c=[$2], 
rowtime=[$3], proctime=[PROCTIME()])
+   :                       +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+   +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+      +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable4]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[rowtime, a, l_cnt, l_uv, r_cnt, r_uv, a0, b, c, rowtime0, 
PROCTIME_MATERIALIZE(proctime) AS proctime])
++- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=-10000, leftUpperBound=3600000, leftTimeIndex=0, 
rightTimeIndex=3], where=[AND(=(a, a0), >=(rowtime, -(rowtime0, 10000:INTERVAL 
SECOND)), <=(rowtime, +(rowtime0, 3600000:INTERVAL HOUR)))], select=[rowtime, 
a, l_cnt, l_uv, r_cnt, r_uv, a0, b, c, rowtime0, proctime])
+   :- Exchange(distribution=[hash[a]])
+   :  +- Calc(select=[window_time AS rowtime, a, cnt AS l_cnt, uv AS l_uv, 
cnt0 AS r_cnt, uv0 AS r_uv])
+   :     +- WindowJoin(leftWindow=[CUMULATE(win_start=[window_start], 
win_end=[window_end], max_size=[1 h], step=[10 min])], 
rightWindow=[CUMULATE(win_start=[window_start], win_end=[window_end], 
max_size=[1 h], step=[10 min])], joinType=[InnerJoin], where=[=(a, a0)], 
select=[a, window_start, window_end, window_time, cnt, uv, a0, window_start0, 
window_end0, cnt0, uv0])
+   :        :- Exchange(distribution=[hash[a]])
+   :        :  +- Calc(select=[a, window_start, window_end, window_time, cnt, 
uv])
+   :        :     +- WindowAggregate(groupBy=[a], 
window=[CUMULATE(time_col=[rowtime], max_size=[1 h], step=[10 min])], 
select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, start('w$) AS 
window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
+   :        :        +- Exchange(distribution=[hash[a]])
+   :        :           +- Calc(select=[a, c, rowtime])
+   :        :              +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+   :        :                 +- Calc(select=[a, b, c, rowtime, PROCTIME() AS 
proctime])
+   :        :                    +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
+   :        +- Exchange(distribution=[hash[a]])
+   :           +- Calc(select=[a, window_start, window_end, cnt, uv])
+   :              +- WindowAggregate(groupBy=[a], 
window=[CUMULATE(time_col=[rowtime], max_size=[1 h], step=[10 min])], 
select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, start('w$) AS 
window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
+   :                 +- Exchange(distribution=[hash[a]])
+   :                    +- Calc(select=[a, c, rowtime])
+   :                       +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+   :                          +- Calc(select=[a, b, c, rowtime, PROCTIME() AS 
proctime])
+   :                             +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+   +- Exchange(distribution=[hash[a]])
+      +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+         +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable4]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testOnCumulateWindowAggregateOnProctime">
+    <Resource name="sql">
+      <![CDATA[
+SELECT L.*, R.*
+FROM (
+  SELECT
+    a,
+    window_start,
+    window_end,
+    window_time,
+    count(*) as cnt,
+    count(distinct c) AS uv
+  FROM TABLE(
+    CUMULATE(
+      TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' 
HOUR))
+  GROUP BY a, window_start, window_end, window_time
+) L
+JOIN (
+  SELECT
+    a,
+    window_start,
+    window_end,
+    window_time,
+    count(*) as cnt,
+    count(distinct c) AS uv
+  FROM TABLE(
+    CUMULATE(
+      TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' 
HOUR))
+  GROUP BY a, window_start, window_end, window_time
+) R
+ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = 
R.a
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], window_start=[$1], window_end=[$2], window_time=[$3], 
cnt=[$4], uv=[$5], a0=[$6], window_start0=[$7], window_end0=[$8], 
window_time0=[$9], cnt0=[$10], uv0=[$11])
++- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6))], 
joinType=[inner])
+   :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
+   :  +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+   :     +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($4), 
600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, 
VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(PROCTIME) window_time)])
+   :        +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+   :           +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+   :              +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :                 +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
+      +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+         +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($4), 
600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, 
VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(PROCTIME) window_time)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+                     +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, window_start, window_end, PROCTIME_MATERIALIZE(window_time) AS 
window_time, cnt, uv, a0, window_start0, window_end0, 
PROCTIME_MATERIALIZE(window_time0) AS window_time0, cnt0, uv0])
++- WindowJoin(leftWindow=[CUMULATE(win_start=[window_start], 
win_end=[window_end], max_size=[1 h], step=[10 min])], 
rightWindow=[CUMULATE(win_start=[window_start], win_end=[window_end], 
max_size=[1 h], step=[10 min])], joinType=[InnerJoin], where=[=(a, a0)], 
select=[a, window_start, window_end, window_time, cnt, uv, a0, window_start0, 
window_end0, window_time0, cnt0, uv0])
+   :- Exchange(distribution=[hash[a]])
+   :  +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+   :     +- WindowAggregate(groupBy=[a], window=[CUMULATE(time_col=[proctime], 
max_size=[1 h], step=[10 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) 
AS uv, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS 
window_time])
+   :        +- Exchange(distribution=[hash[a]])
+   :           +- Calc(select=[a, c, proctime])
+   :              +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+   :                 +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+   :                    +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+         +- WindowAggregate(groupBy=[a], window=[CUMULATE(time_col=[proctime], 
max_size=[1 h], step=[10 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) 
AS uv, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS 
window_time])
+            +- Exchange(distribution=[hash[a]])
+               +- Calc(select=[a, c, proctime])
+                  +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+                     +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+                        +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testOnHopWindowAggregate">
+    <Resource name="sql">
+      <![CDATA[
+SELECT L.*, R.*
+FROM (
+  SELECT
+    a,
+    window_start,
+    window_end,
+    window_time,
+    count(*) as cnt,
+    count(distinct c) AS uv
+  FROM TABLE(
+  HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' 
MINUTE))
+  GROUP BY a, window_start, window_end, window_time
+) L
+JOIN (
+  SELECT
+    a,
+    window_start,
+    window_end,
+    window_time,
+    count(*) as cnt,
+    count(distinct c) AS uv
+  FROM TABLE(
+  HOP(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' 
MINUTE))
+  GROUP BY a, window_start, window_end, window_time
+) R
+ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = 
R.a
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], window_start=[$1], window_end=[$2], window_time=[$3], 
cnt=[$4], uv=[$5], a0=[$6], window_start0=[$7], window_end0=[$8], 
window_time0=[$9], cnt0=[$10], uv0=[$11])
++- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6))], 
joinType=[inner])
+   :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
+   :  +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+   :     +- LogicalTableFunctionScan(invocation=[HOP($4, DESCRIPTOR($3), 
300000:INTERVAL MINUTE, 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER 
a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
+   :        +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+   :           +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+   :              +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :                 +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
+      +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+         +- LogicalTableFunctionScan(invocation=[HOP($4, DESCRIPTOR($3), 
300000:INTERVAL MINUTE, 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER 
a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+                     +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+WindowJoin(leftWindow=[HOP(win_start=[window_start], win_end=[window_end], 
size=[10 min], slide=[5 min])], rightWindow=[HOP(win_start=[window_start], 
win_end=[window_end], size=[10 min], slide=[5 min])], joinType=[InnerJoin], 
where=[=(a, a0)], select=[a, window_start, window_end, window_time, cnt, uv, 
a0, window_start0, window_end0, window_time0, cnt0, uv0])
+:- Exchange(distribution=[hash[a]])
+:  +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+:     +- WindowAggregate(groupBy=[a], window=[HOP(time_col=[rowtime], size=[10 
min], slide=[5 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, 
start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS 
window_time])
+:        +- Exchange(distribution=[hash[a]])
+:           +- Calc(select=[a, c, rowtime])
+:              +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+:                 +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+:                    +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
++- Exchange(distribution=[hash[a]])
+   +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+      +- WindowAggregate(groupBy=[a], window=[HOP(time_col=[rowtime], size=[10 
min], slide=[5 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, 
start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS 
window_time])
+         +- Exchange(distribution=[hash[a]])
+            +- Calc(select=[a, c, rowtime])
+               +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+                  +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testOnHopWindowAggregateOnProctime">
+    <Resource name="sql">
+      <![CDATA[
+SELECT L.*, R.*
+FROM (
+  SELECT
+    a,
+    window_start,
+    window_end,
+    window_time,
+    count(*) as cnt,
+    count(distinct c) AS uv
+  FROM TABLE(
+  HOP(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE, INTERVAL '10' 
MINUTE))
+  GROUP BY a, window_start, window_end, window_time
+) L
+JOIN (
+  SELECT
+    a,
+    window_start,
+    window_end,
+    window_time,
+    count(*) as cnt,
+    count(distinct c) AS uv
+  FROM TABLE(
+  HOP(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '5' MINUTE, INTERVAL '10' 
MINUTE))
+  GROUP BY a, window_start, window_end, window_time
+) R
+ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = 
R.a
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], window_start=[$1], window_end=[$2], window_time=[$3], 
cnt=[$4], uv=[$5], a0=[$6], window_start0=[$7], window_end0=[$8], 
window_time0=[$9], cnt0=[$10], uv0=[$11])
++- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6))], 
joinType=[inner])
+   :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
+   :  +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+   :     +- LogicalTableFunctionScan(invocation=[HOP($4, DESCRIPTOR($4), 
300000:INTERVAL MINUTE, 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER 
a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(PROCTIME) window_time)])
+   :        +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+   :           +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+   :              +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :                 +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
+      +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+         +- LogicalTableFunctionScan(invocation=[HOP($4, DESCRIPTOR($4), 
300000:INTERVAL MINUTE, 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER 
a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(PROCTIME) window_time)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+                     +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, window_start, window_end, PROCTIME_MATERIALIZE(window_time) AS 
window_time, cnt, uv, a0, window_start0, window_end0, 
PROCTIME_MATERIALIZE(window_time0) AS window_time0, cnt0, uv0])
++- WindowJoin(leftWindow=[HOP(win_start=[window_start], win_end=[window_end], 
size=[10 min], slide=[5 min])], rightWindow=[HOP(win_start=[window_start], 
win_end=[window_end], size=[10 min], slide=[5 min])], joinType=[InnerJoin], 
where=[=(a, a0)], select=[a, window_start, window_end, window_time, cnt, uv, 
a0, window_start0, window_end0, window_time0, cnt0, uv0])
+   :- Exchange(distribution=[hash[a]])
+   :  +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+   :     +- WindowAggregate(groupBy=[a], window=[HOP(time_col=[proctime], 
size=[10 min], slide=[5 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) 
AS uv, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS 
window_time])
+   :        +- Exchange(distribution=[hash[a]])
+   :           +- Calc(select=[a, c, proctime])
+   :              +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+   :                 +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+   :                    +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+         +- WindowAggregate(groupBy=[a], window=[HOP(time_col=[proctime], 
size=[10 min], slide=[5 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) 
AS uv, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS 
window_time])
+            +- Exchange(distribution=[hash[a]])
+               +- Calc(select=[a, c, proctime])
+                  +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+                     +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+                        +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testOnTumbleWindowAggregate">
+    <Resource name="sql">
+      <![CDATA[
+SELECT L.*, R.*
+FROM (
+  SELECT
+    a,
+    window_start,
+    window_end,
+    window_time,
+    count(*) as cnt,
+    count(distinct c) AS uv
+  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
+  GROUP BY a, window_start, window_end, window_time
+) L
+JOIN (
+  SELECT
+    a,
+    window_start,
+    window_end,
+    window_time,
+    count(*) as cnt,
+    count(distinct c) AS uv
+  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
+  GROUP BY a, window_start, window_end, window_time
+) R
+ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = 
R.a
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], window_start=[$1], window_end=[$2], window_time=[$3], 
cnt=[$4], uv=[$5], a0=[$6], window_start0=[$7], window_end0=[$8], 
window_time0=[$9], cnt0=[$10], uv0=[$11])
++- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6))], 
joinType=[inner])
+   :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
+   :  +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+   :     +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($3), 
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, 
BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) 
window_time)])
+   :        +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+   :           +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+   :              +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :                 +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
+      +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+         +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($3), 
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, 
BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) 
window_time)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+                     +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], 
size=[15 min])], rightWindow=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[15 min])], joinType=[InnerJoin], where=[=(a, a0)], 
select=[a, window_start, window_end, window_time, cnt, uv, a0, window_start0, 
window_end0, window_time0, cnt0, uv0])
+:- Exchange(distribution=[hash[a]])
+:  +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+:     +- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[rowtime], 
size=[15 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, 
start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS 
window_time])
+:        +- Exchange(distribution=[hash[a]])
+:           +- Calc(select=[a, c, rowtime])
+:              +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+:                 +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+:                    +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
++- Exchange(distribution=[hash[a]])
+   +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+      +- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[rowtime], 
size=[15 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, 
start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS 
window_time])
+         +- Exchange(distribution=[hash[a]])
+            +- Calc(select=[a, c, rowtime])
+               +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+                  +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testOnTumbleWindowAggregateOnProctime">
+    <Resource name="sql">
+      <![CDATA[
+SELECT L.*, R.*
+FROM (
+  SELECT
+    a,
+    window_start,
+    window_end,
+    window_time,
+    count(*) as cnt,
+    count(distinct c) AS uv
+  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE))
+  GROUP BY a, window_start, window_end, window_time
+) L
+JOIN (
+  SELECT
+    a,
+    window_start,
+    window_end,
+    window_time,
+    count(*) as cnt,
+    count(distinct c) AS uv
+  FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '15' 
MINUTE))
+  GROUP BY a, window_start, window_end, window_time
+) R
+ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = 
R.a
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], window_start=[$1], window_end=[$2], window_time=[$3], 
cnt=[$4], uv=[$5], a0=[$6], window_start0=[$7], window_end0=[$8], 
window_time0=[$9], cnt0=[$10], uv0=[$11])
++- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6))], 
joinType=[inner])
+   :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
+   :  +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+   :     +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($4), 
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, 
BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(PROCTIME) 
window_time)])
+   :        +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+   :           +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+   :              +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :                 +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
+      +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+         +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($4), 
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, 
BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(PROCTIME) 
window_time)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+                     +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, window_start, window_end, PROCTIME_MATERIALIZE(window_time) AS 
window_time, cnt, uv, a0, window_start0, window_end0, 
PROCTIME_MATERIALIZE(window_time0) AS window_time0, cnt0, uv0])
++- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[15 min])], 
rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 
min])], joinType=[InnerJoin], where=[=(a, a0)], select=[a, window_start, 
window_end, window_time, cnt, uv, a0, window_start0, window_end0, window_time0, 
cnt0, uv0])
+   :- Exchange(distribution=[hash[a]])
+   :  +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+   :     +- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[proctime], 
size=[15 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, 
start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS 
window_time])
+   :        +- Exchange(distribution=[hash[a]])
+   :           +- Calc(select=[a, c, proctime])
+   :              +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+   :                 +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+   :                    +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+         +- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[proctime], 
size=[15 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, 
start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS 
window_time])
+            +- Exchange(distribution=[hash[a]])
+               +- Calc(select=[a, c, proctime])
+                  +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+                     +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+                        +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testTimeAttributePropagateForWindowJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT tmp.*, MyTable3.* FROM tmp JOIN MyTable3 ON
+ tmp.a = MyTable3.a AND
+ tmp.rowtime BETWEEN
+   MyTable3.rowtime - INTERVAL '10' SECOND AND
+   MyTable3.rowtime + INTERVAL '1' HOUR
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(rowtime=[$0], a=[$1], l_b=[$2], l_c=[$3], r_b=[$4], r_c=[$5], 
a0=[$6], b=[$7], c=[$8], rowtime0=[$9], proctime=[$10])
++- LogicalJoin(condition=[AND(=($1, $6), >=($0, -($9, 10000:INTERVAL SECOND)), 
<=($0, +($9, 3600000:INTERVAL HOUR)))], joinType=[inner])
+   :- LogicalProject(rowtime=[$7], a=[$0], l_b=[$1], l_c=[$2], r_b=[$9], 
r_c=[$10])
+   :  +- LogicalJoin(condition=[AND(=($5, $13), =($6, $14), =($0, $8))], 
joinType=[inner])
+   :     :- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4], window_start=[$5], window_end=[$6], window_time=[$7])
+   :     :  +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($3), 
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, 
BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) 
window_time)])
+   :     :     +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+   :     :        +- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($3, 1000:INTERVAL SECOND)])
+   :     :           +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :     :              +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   :     +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4], window_start=[$5], window_end=[$6], window_time=[$7])
+   :        +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($3), 
900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, 
BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, 
TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) 
window_time)])
+   :           +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+   :              +- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($3, 1000:INTERVAL SECOND)])
+   :                 +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :                    +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+   +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+      +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
MyTable3]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[rowtime, a, l_b, l_c, r_b, r_c, a0, b, c, rowtime0, 
PROCTIME_MATERIALIZE(proctime) AS proctime])
++- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=-10000, leftUpperBound=3600000, leftTimeIndex=0, 
rightTimeIndex=3], where=[AND(=(a, a0), >=(rowtime, -(rowtime0, 10000:INTERVAL 
SECOND)), <=(rowtime, +(rowtime0, 3600000:INTERVAL HOUR)))], select=[rowtime, 
a, l_b, l_c, r_b, r_c, a0, b, c, rowtime0, proctime])
+   :- Exchange(distribution=[hash[a]])
+   :  +- Calc(select=[window_time AS rowtime, a, b AS l_b, c AS l_c, b0 AS 
r_b, c0 AS r_c])
+   :     +- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[15 min])], 
rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 
min])], joinType=[InnerJoin], where=[=(a, a0)], select=[a, b, c, window_start, 
window_end, window_time, a0, b0, c0, window_start0, window_end0])
+   :        :- Exchange(distribution=[hash[a]])
+   :        :  +- Calc(select=[a, b, c, window_start, window_end, window_time])
+   :        :     +- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], 
size=[15 min])])
+   :        :        +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+   :        :           +- Calc(select=[a, b, c, rowtime, PROCTIME() AS 
proctime])
+   :        :              +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
+   :        +- Exchange(distribution=[hash[a]])
+   :           +- Calc(select=[a, b, c, window_start, window_end])
+   :              +- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], 
size=[15 min])])
+   :                 +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+   :                    +- Calc(select=[a, b, c, rowtime, PROCTIME() AS 
proctime])
+   :                       +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+   +- Exchange(distribution=[hash[a]])
+      +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+         +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
MyTable3]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testWindowJoinWithNonEqui">
+    <Resource name="sql">
+      <![CDATA[
+SELECT L.*, R.*
+FROM (
+  SELECT
+    a,
+    window_start,
+    window_end,
+    window_time,
+    count(*) as cnt,
+    count(distinct c) AS uv
+  FROM TABLE(
+    CUMULATE(
+      TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' 
HOUR))
+  GROUP BY a, window_start, window_end, window_time
+) L
+JOIN (
+  SELECT
+    a,
+    window_start,
+    window_end,
+    window_time,
+    count(*) as cnt,
+    count(distinct c) AS uv
+  FROM TABLE(
+    CUMULATE(
+      TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' 
HOUR))
+  GROUP BY a, window_start, window_end, window_time
+) R
+ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = 
R.a AND
+ CAST(L.window_start AS BIGINT) > R.uv
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], window_start=[$1], window_end=[$2], window_time=[$3], 
cnt=[$4], uv=[$5], a0=[$6], window_start0=[$7], window_end0=[$8], 
window_time0=[$9], cnt0=[$10], uv0=[$11])
++- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6), 
>(CAST($1):BIGINT NOT NULL, $11))], joinType=[inner])
+   :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
+   :  +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+   :     +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($4), 
600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, 
VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(PROCTIME) window_time)])
+   :        +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+   :           +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+   :              +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+   :                 +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+   +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT 
$4)])
+      +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+         +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($4), 
600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, 
VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME 
ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) 
window_end, TIME ATTRIBUTE(PROCTIME) window_time)])
+            +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+               +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 
1000:INTERVAL SECOND)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[PROCTIME()])
+                     +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a, window_start, window_end, PROCTIME_MATERIALIZE(window_time) AS 
window_time, cnt, uv, a0, window_start0, window_end0, 
PROCTIME_MATERIALIZE(window_time0) AS window_time0, cnt0, uv0])
++- WindowJoin(leftWindow=[CUMULATE(win_start=[window_start], 
win_end=[window_end], max_size=[1 h], step=[10 min])], 
rightWindow=[CUMULATE(win_start=[window_start], win_end=[window_end], 
max_size=[1 h], step=[10 min])], joinType=[InnerJoin], where=[AND(=(a, a0), 
>(CAST(window_start), uv0))], select=[a, window_start, window_end, window_time, 
cnt, uv, a0, window_start0, window_end0, window_time0, cnt0, uv0])
+   :- Exchange(distribution=[hash[a]])
+   :  +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+   :     +- WindowAggregate(groupBy=[a], window=[CUMULATE(time_col=[proctime], 
max_size=[1 h], step=[10 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) 
AS uv, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS 
window_time])
+   :        +- Exchange(distribution=[hash[a]])
+   :           +- Calc(select=[a, c, proctime])
+   :              +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+   :                 +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+   :                    +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable]], fields=[a, b, c, rowtime])
+   +- Exchange(distribution=[hash[a]])
+      +- Calc(select=[a, window_start, window_end, window_time, cnt, uv])
+         +- WindowAggregate(groupBy=[a], window=[CUMULATE(time_col=[proctime], 
max_size=[1 h], step=[10 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) 
AS uv, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS 
window_time])
+            +- Exchange(distribution=[hash[a]])
+               +- Calc(select=[a, c, proctime])
+                  +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+                     +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime])
+                        +- TableSourceScan(table=[[default_catalog, 
default_database, MyTable2]], fields=[a, b, c, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testWindowPropertyPropagateForWindowJoin">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM
+(
+  SELECT *,
+    ROW_NUMBER() OVER(
+      PARTITION BY window_start, window_end ORDER BY l_cnt DESC) as rownum
+  FROM tmp2
+)
+WHERE rownum <= 3
+      ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(window_start=[$0], window_end=[$1], a=[$2], l_cnt=[$3], 
l_uv=[$4], r_cnt=[$5], r_uv=[$6], rownum=[$7])
++- LogicalFilter(condition=[<=($7, 3)])
+   +- LogicalProject(window_start=[$1], window_end=[$2], a=[$0], l_cnt=[$4], 
l_uv=[$5], r_cnt=[$10], r_uv=[$11], rownum=[ROW_NUMBER() OVER (PARTITION BY $1, 
$2 ORDER BY $4 DESC NULLS LAST)])
+      +- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6))], 
joinType=[inner])
+         :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], 
uv=[COUNT(DISTINCT $4)])
+         :  +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+         :     +- LogicalTableFunctionScan(invocation=[CUMULATE($4, 
DESCRIPTOR($3), 600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], 
rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME 
ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) 
window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
+         :        +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+         :           +- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($3, 1000:INTERVAL SECOND)])
+         :              +- LogicalProject(a=[$0], b=[$1], c=[$2], 
rowtime=[$3], proctime=[PROCTIME()])
+         :                 +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable]])
+         +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], 
uv=[COUNT(DISTINCT $4)])
+            +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], 
window_time=[$7], c=[$2])
+               +- LogicalTableFunctionScan(invocation=[CUMULATE($4, 
DESCRIPTOR($3), 600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], 
rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME 
ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) 
window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)])
+                  +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], 
proctime=[$4])
+                     +- LogicalWatermarkAssigner(rowtime=[rowtime], 
watermark=[-($3, 1000:INTERVAL SECOND)])
+                        +- LogicalProject(a=[$0], b=[$1], c=[$2], 
rowtime=[$3], proctime=[PROCTIME()])
+                           +- LogicalTableScan(table=[[default_catalog, 
default_database, MyTable2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[window_start, window_end, a, cnt AS l_cnt, uv AS l_uv, cnt0 AS 
r_cnt, uv0 AS r_uv, rownum])
++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=3], partitionBy=[window_start, window_end], 
orderBy=[cnt DESC], select=[a, window_start, window_end, window_time, cnt, uv, 
a0, window_start0, window_end0, window_time0, cnt0, uv0, rownum])

Review comment:
       Good catch, I missed windowProperties inference for `FlinkLogicalJoin` 
in `FlinkRelMdWindowProperties`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to