beyond1920 commented on a change in pull request #15195: URL: https://github.com/apache/flink/pull/15195#discussion_r594034270
########## File path: flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/WindowJoinTest.xml ########## @@ -0,0 +1,1041 @@ +<?xml version="1.0" ?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<Root> + <TestCase name="testCantMergeWindowTVF_Cumulate"> + <Resource name="sql"> + <![CDATA[ +SELECT L.a, L.b, L.c, R.a, R.b, R.c +FROM ( + SELECT * + FROM TABLE( + CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR)) +) L +JOIN ( + SELECT * + FROM TABLE( + CUMULATE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR)) +) R +ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$8], b0=[$9], c0=[$10]) ++- LogicalJoin(condition=[AND(=($5, $13), =($6, $14), =($0, $8))], joinType=[inner]) + :- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], window_start=[$5], window_end=[$6], window_time=[$7]) + : +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($3), 600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], window_start=[$5], window_end=[$6], window_time=[$7]) + +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($3), 600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[a, b, c, a0, b0, c0]) ++- WindowJoin(leftWindow=[CUMULATE(win_start=[window_start], win_end=[window_end], max_size=[1 h], step=[10 min])], rightWindow=[CUMULATE(win_start=[window_start], win_end=[window_end], max_size=[1 h], step=[10 min])], joinType=[InnerJoin], where=[=(a, a0)], select=[a, b, c, window_start, window_end, a0, b0, c0, window_start0, window_end0]) + :- Exchange(distribution=[hash[a]]) + : +- Calc(select=[a, b, c, window_start, window_end]) + : +- WindowTableFunction(window=[CUMULATE(time_col=[rowtime], max_size=[1 h], step=[10 min])]) + : +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + : +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, b, c, window_start, window_end]) + +- WindowTableFunction(window=[CUMULATE(time_col=[rowtime], max_size=[1 h], step=[10 min])]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testCantMergeWindowTVF_CumulateOnProctime"> + <Resource name="sql"> + <![CDATA[ +SELECT L.a, L.b, L.c, R.a, R.b, R.c +FROM ( + SELECT * + FROM TABLE( + CUMULATE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR)) +) L +JOIN ( + SELECT * + FROM TABLE( + CUMULATE(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR)) +) R +ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$8], b0=[$9], c0=[$10]) ++- LogicalJoin(condition=[AND(=($5, $13), =($6, $14), =($0, $8))], joinType=[inner]) + :- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], window_start=[$5], window_end=[$6], window_time=[$7]) + : +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($4), 600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(PROCTIME) window_time)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], window_start=[$5], window_end=[$6], window_time=[$7]) + +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($4), 600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(PROCTIME) window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[a, b, c, a0, b0, c0]) ++- WindowJoin(leftWindow=[CUMULATE(win_start=[window_start], win_end=[window_end], max_size=[1 h], step=[10 min])], rightWindow=[CUMULATE(win_start=[window_start], win_end=[window_end], max_size=[1 h], step=[10 min])], joinType=[InnerJoin], where=[=(a, a0)], select=[a, b, c, window_start, window_end, a0, b0, c0, window_start0, window_end0]) + :- Exchange(distribution=[hash[a]]) + : +- Calc(select=[a, b, c, window_start, window_end]) + : +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], max_size=[1 h], step=[10 min])]) + : +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + : +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, b, c, window_start, window_end]) + +- WindowTableFunction(window=[CUMULATE(time_col=[proctime], max_size=[1 h], step=[10 min])]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testCantMergeWindowTVF_Hop"> + <Resource name="sql"> + <![CDATA[ +SELECT L.a, L.b, L.c, R.a, R.b, R.c +FROM ( + SELECT * + FROM TABLE( + HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE)) +) L +JOIN ( + SELECT * + FROM TABLE( + HOP(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE)) +) R +ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$8], b0=[$9], c0=[$10]) ++- LogicalJoin(condition=[AND(=($5, $13), =($6, $14), =($0, $8))], joinType=[inner]) + :- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], window_start=[$5], window_end=[$6], window_time=[$7]) + : +- LogicalTableFunctionScan(invocation=[HOP($4, DESCRIPTOR($3), 300000:INTERVAL MINUTE, 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], window_start=[$5], window_end=[$6], window_time=[$7]) + +- LogicalTableFunctionScan(invocation=[HOP($4, DESCRIPTOR($3), 300000:INTERVAL MINUTE, 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[a, b, c, a0, b0, c0]) ++- WindowJoin(leftWindow=[HOP(win_start=[window_start], win_end=[window_end], size=[10 min], slide=[5 min])], rightWindow=[HOP(win_start=[window_start], win_end=[window_end], size=[10 min], slide=[5 min])], joinType=[InnerJoin], where=[=(a, a0)], select=[a, b, c, window_start, window_end, a0, b0, c0, window_start0, window_end0]) + :- Exchange(distribution=[hash[a]]) + : +- Calc(select=[a, b, c, window_start, window_end]) + : +- WindowTableFunction(window=[HOP(time_col=[rowtime], size=[10 min], slide=[5 min])]) + : +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + : +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, b, c, window_start, window_end]) + +- WindowTableFunction(window=[HOP(time_col=[rowtime], size=[10 min], slide=[5 min])]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testCantMergeWindowTVF_HopOnProctime"> + <Resource name="sql"> + <![CDATA[ +SELECT L.a, L.b, L.c, R.a, R.b, R.c +FROM ( + SELECT * + FROM TABLE( + HOP(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE)) +) L +JOIN ( + SELECT * + FROM TABLE( + HOP(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE)) +) R +ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$8], b0=[$9], c0=[$10]) ++- LogicalJoin(condition=[AND(=($5, $13), =($6, $14), =($0, $8))], joinType=[inner]) + :- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], window_start=[$5], window_end=[$6], window_time=[$7]) + : +- LogicalTableFunctionScan(invocation=[HOP($4, DESCRIPTOR($4), 300000:INTERVAL MINUTE, 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(PROCTIME) window_time)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], window_start=[$5], window_end=[$6], window_time=[$7]) + +- LogicalTableFunctionScan(invocation=[HOP($4, DESCRIPTOR($4), 300000:INTERVAL MINUTE, 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(PROCTIME) window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[a, b, c, a0, b0, c0]) ++- WindowJoin(leftWindow=[HOP(win_start=[window_start], win_end=[window_end], size=[10 min], slide=[5 min])], rightWindow=[HOP(win_start=[window_start], win_end=[window_end], size=[10 min], slide=[5 min])], joinType=[InnerJoin], where=[=(a, a0)], select=[a, b, c, window_start, window_end, a0, b0, c0, window_start0, window_end0]) + :- Exchange(distribution=[hash[a]]) + : +- Calc(select=[a, b, c, window_start, window_end]) + : +- WindowTableFunction(window=[HOP(time_col=[proctime], size=[10 min], slide=[5 min])]) + : +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + : +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, b, c, window_start, window_end]) + +- WindowTableFunction(window=[HOP(time_col=[proctime], size=[10 min], slide=[5 min])]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testCantMergeWindowTVF_Tumble"> + <Resource name="sql"> + <![CDATA[ +SELECT L.a, L.b, L.c, R.a, R.b, R.c +FROM ( + SELECT * + FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) +) L +JOIN ( + SELECT * + FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) +) R +ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$8], b0=[$9], c0=[$10]) ++- LogicalJoin(condition=[AND(=($5, $13), =($6, $14), =($0, $8))], joinType=[inner]) + :- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], window_start=[$5], window_end=[$6], window_time=[$7]) + : +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($3), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], window_start=[$5], window_end=[$6], window_time=[$7]) + +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($3), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[a, b, c, a0, b0, c0]) ++- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], joinType=[InnerJoin], where=[=(a, a0)], select=[a, b, c, window_start, window_end, a0, b0, c0, window_start0, window_end0]) + :- Exchange(distribution=[hash[a]]) + : +- Calc(select=[a, b, c, window_start, window_end]) + : +- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 min])]) + : +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + : +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, b, c, window_start, window_end]) + +- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 min])]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testCantMergeWindowTVF_TumbleOnProctime"> + <Resource name="sql"> + <![CDATA[ +SELECT L.a, L.b, L.c, R.a, R.b, R.c +FROM ( + SELECT * + FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE)) +) L +JOIN ( + SELECT * + FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '15' MINUTE)) +) R +ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], a0=[$8], b0=[$9], c0=[$10]) ++- LogicalJoin(condition=[AND(=($5, $13), =($6, $14), =($0, $8))], joinType=[inner]) + :- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], window_start=[$5], window_end=[$6], window_time=[$7]) + : +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($4), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(PROCTIME) window_time)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], window_start=[$5], window_end=[$6], window_time=[$7]) + +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($4), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(PROCTIME) window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[a, b, c, a0, b0, c0]) ++- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], joinType=[InnerJoin], where=[=(a, a0)], select=[a, b, c, window_start, window_end, a0, b0, c0, window_start0, window_end0]) + :- Exchange(distribution=[hash[a]]) + : +- Calc(select=[a, b, c, window_start, window_end]) + : +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[15 min])]) + : +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + : +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, b, c, window_start, window_end]) + +- WindowTableFunction(window=[TUMBLE(time_col=[proctime], size=[15 min])]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testOnCumulateWindowAggregate"> + <Resource name="sql"> + <![CDATA[ +SELECT L.*, R.* +FROM ( + SELECT + a, + window_start, + window_end, + window_time, + count(*) as cnt, + count(distinct c) AS uv + FROM TABLE( + CUMULATE( + TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR)) + GROUP BY a, window_start, window_end, window_time +) L +JOIN ( + SELECT + a, + window_start, + window_end, + window_time, + count(*) as cnt, + count(distinct c) AS uv + FROM TABLE( + CUMULATE( + TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR)) + GROUP BY a, window_start, window_end, window_time +) R +ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], window_start=[$1], window_end=[$2], window_time=[$3], cnt=[$4], uv=[$5], a0=[$6], window_start0=[$7], window_end0=[$8], window_time0=[$9], cnt0=[$10], uv0=[$11]) ++- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6))], joinType=[inner]) + :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT $4)]) + : +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], window_time=[$7], c=[$2]) + : +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($3), 600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT $4)]) + +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], window_time=[$7], c=[$2]) + +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($3), 600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +WindowJoin(leftWindow=[CUMULATE(win_start=[window_start], win_end=[window_end], max_size=[1 h], step=[10 min])], rightWindow=[CUMULATE(win_start=[window_start], win_end=[window_end], max_size=[1 h], step=[10 min])], joinType=[InnerJoin], where=[=(a, a0)], select=[a, window_start, window_end, window_time, cnt, uv, a0, window_start0, window_end0, window_time0, cnt0, uv0]) +:- Exchange(distribution=[hash[a]]) +: +- Calc(select=[a, window_start, window_end, window_time, cnt, uv]) +: +- WindowAggregate(groupBy=[a], window=[CUMULATE(time_col=[rowtime], max_size=[1 h], step=[10 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time]) +: +- Exchange(distribution=[hash[a]]) +: +- Calc(select=[a, c, rowtime]) +: +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) +: +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) +: +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime]) ++- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, window_start, window_end, window_time, cnt, uv]) + +- WindowAggregate(groupBy=[a], window=[CUMULATE(time_col=[rowtime], max_size=[1 h], step=[10 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, c, rowtime]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testTimeAttributePropagateForWindowJoin1"> + <Resource name="sql"> + <![CDATA[ +SELECT tmp1.*, MyTable4.* FROM tmp1 JOIN MyTable4 ON + tmp1.a = MyTable4.a AND + tmp1.rowtime BETWEEN + MyTable4.rowtime - INTERVAL '10' SECOND AND + MyTable4.rowtime + INTERVAL '1' HOUR +]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(rowtime=[$0], a=[$1], l_cnt=[$2], l_uv=[$3], r_cnt=[$4], r_uv=[$5], a0=[$6], b=[$7], c=[$8], rowtime0=[$9], proctime=[$10]) ++- LogicalJoin(condition=[AND(=($1, $6), >=($0, -($9, 10000:INTERVAL SECOND)), <=($0, +($9, 3600000:INTERVAL HOUR)))], joinType=[inner]) + :- LogicalProject(rowtime=[$3], a=[$0], l_cnt=[$4], l_uv=[$5], r_cnt=[$10], r_uv=[$11]) + : +- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6))], joinType=[inner]) + : :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT $4)]) + : : +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], window_time=[$7], c=[$2]) + : : +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($3), 600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)]) + : : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + : : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + : : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + : +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT $4)]) + : +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], window_time=[$7], c=[$2]) + : +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($3), 600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable4]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[rowtime, a, l_cnt, l_uv, r_cnt, r_uv, a0, b, c, rowtime0, PROCTIME_MATERIALIZE(proctime) AS proctime]) ++- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-10000, leftUpperBound=3600000, leftTimeIndex=0, rightTimeIndex=3], where=[AND(=(a, a0), >=(rowtime, -(rowtime0, 10000:INTERVAL SECOND)), <=(rowtime, +(rowtime0, 3600000:INTERVAL HOUR)))], select=[rowtime, a, l_cnt, l_uv, r_cnt, r_uv, a0, b, c, rowtime0, proctime]) + :- Exchange(distribution=[hash[a]]) + : +- Calc(select=[window_time AS rowtime, a, cnt AS l_cnt, uv AS l_uv, cnt0 AS r_cnt, uv0 AS r_uv]) + : +- WindowJoin(leftWindow=[CUMULATE(win_start=[window_start], win_end=[window_end], max_size=[1 h], step=[10 min])], rightWindow=[CUMULATE(win_start=[window_start], win_end=[window_end], max_size=[1 h], step=[10 min])], joinType=[InnerJoin], where=[=(a, a0)], select=[a, window_start, window_end, window_time, cnt, uv, a0, window_start0, window_end0, cnt0, uv0]) + : :- Exchange(distribution=[hash[a]]) + : : +- Calc(select=[a, window_start, window_end, window_time, cnt, uv]) + : : +- WindowAggregate(groupBy=[a], window=[CUMULATE(time_col=[rowtime], max_size=[1 h], step=[10 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time]) + : : +- Exchange(distribution=[hash[a]]) + : : +- Calc(select=[a, c, rowtime]) + : : +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + : : +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) + : : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime]) + : +- Exchange(distribution=[hash[a]]) + : +- Calc(select=[a, window_start, window_end, cnt, uv]) + : +- WindowAggregate(groupBy=[a], window=[CUMULATE(time_col=[rowtime], max_size=[1 h], step=[10 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time]) + : +- Exchange(distribution=[hash[a]]) + : +- Calc(select=[a, c, rowtime]) + : +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + : +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, rowtime]) + +- Exchange(distribution=[hash[a]]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable4]], fields=[a, b, c, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testOnCumulateWindowAggregateOnProctime"> + <Resource name="sql"> + <![CDATA[ +SELECT L.*, R.* +FROM ( + SELECT + a, + window_start, + window_end, + window_time, + count(*) as cnt, + count(distinct c) AS uv + FROM TABLE( + CUMULATE( + TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR)) + GROUP BY a, window_start, window_end, window_time +) L +JOIN ( + SELECT + a, + window_start, + window_end, + window_time, + count(*) as cnt, + count(distinct c) AS uv + FROM TABLE( + CUMULATE( + TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR)) + GROUP BY a, window_start, window_end, window_time +) R +ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], window_start=[$1], window_end=[$2], window_time=[$3], cnt=[$4], uv=[$5], a0=[$6], window_start0=[$7], window_end0=[$8], window_time0=[$9], cnt0=[$10], uv0=[$11]) ++- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6))], joinType=[inner]) + :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT $4)]) + : +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], window_time=[$7], c=[$2]) + : +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($4), 600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(PROCTIME) window_time)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT $4)]) + +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], window_time=[$7], c=[$2]) + +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($4), 600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(PROCTIME) window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[a, window_start, window_end, PROCTIME_MATERIALIZE(window_time) AS window_time, cnt, uv, a0, window_start0, window_end0, PROCTIME_MATERIALIZE(window_time0) AS window_time0, cnt0, uv0]) ++- WindowJoin(leftWindow=[CUMULATE(win_start=[window_start], win_end=[window_end], max_size=[1 h], step=[10 min])], rightWindow=[CUMULATE(win_start=[window_start], win_end=[window_end], max_size=[1 h], step=[10 min])], joinType=[InnerJoin], where=[=(a, a0)], select=[a, window_start, window_end, window_time, cnt, uv, a0, window_start0, window_end0, window_time0, cnt0, uv0]) + :- Exchange(distribution=[hash[a]]) + : +- Calc(select=[a, window_start, window_end, window_time, cnt, uv]) + : +- WindowAggregate(groupBy=[a], window=[CUMULATE(time_col=[proctime], max_size=[1 h], step=[10 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS window_time]) + : +- Exchange(distribution=[hash[a]]) + : +- Calc(select=[a, c, proctime]) + : +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + : +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, window_start, window_end, window_time, cnt, uv]) + +- WindowAggregate(groupBy=[a], window=[CUMULATE(time_col=[proctime], max_size=[1 h], step=[10 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS window_time]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, c, proctime]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testOnHopWindowAggregate"> + <Resource name="sql"> + <![CDATA[ +SELECT L.*, R.* +FROM ( + SELECT + a, + window_start, + window_end, + window_time, + count(*) as cnt, + count(distinct c) AS uv + FROM TABLE( + HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE)) + GROUP BY a, window_start, window_end, window_time +) L +JOIN ( + SELECT + a, + window_start, + window_end, + window_time, + count(*) as cnt, + count(distinct c) AS uv + FROM TABLE( + HOP(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE)) + GROUP BY a, window_start, window_end, window_time +) R +ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], window_start=[$1], window_end=[$2], window_time=[$3], cnt=[$4], uv=[$5], a0=[$6], window_start0=[$7], window_end0=[$8], window_time0=[$9], cnt0=[$10], uv0=[$11]) ++- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6))], joinType=[inner]) + :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT $4)]) + : +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], window_time=[$7], c=[$2]) + : +- LogicalTableFunctionScan(invocation=[HOP($4, DESCRIPTOR($3), 300000:INTERVAL MINUTE, 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT $4)]) + +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], window_time=[$7], c=[$2]) + +- LogicalTableFunctionScan(invocation=[HOP($4, DESCRIPTOR($3), 300000:INTERVAL MINUTE, 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +WindowJoin(leftWindow=[HOP(win_start=[window_start], win_end=[window_end], size=[10 min], slide=[5 min])], rightWindow=[HOP(win_start=[window_start], win_end=[window_end], size=[10 min], slide=[5 min])], joinType=[InnerJoin], where=[=(a, a0)], select=[a, window_start, window_end, window_time, cnt, uv, a0, window_start0, window_end0, window_time0, cnt0, uv0]) +:- Exchange(distribution=[hash[a]]) +: +- Calc(select=[a, window_start, window_end, window_time, cnt, uv]) +: +- WindowAggregate(groupBy=[a], window=[HOP(time_col=[rowtime], size=[10 min], slide=[5 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time]) +: +- Exchange(distribution=[hash[a]]) +: +- Calc(select=[a, c, rowtime]) +: +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) +: +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) +: +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime]) ++- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, window_start, window_end, window_time, cnt, uv]) + +- WindowAggregate(groupBy=[a], window=[HOP(time_col=[rowtime], size=[10 min], slide=[5 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, c, rowtime]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testOnHopWindowAggregateOnProctime"> + <Resource name="sql"> + <![CDATA[ +SELECT L.*, R.* +FROM ( + SELECT + a, + window_start, + window_end, + window_time, + count(*) as cnt, + count(distinct c) AS uv + FROM TABLE( + HOP(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE)) + GROUP BY a, window_start, window_end, window_time +) L +JOIN ( + SELECT + a, + window_start, + window_end, + window_time, + count(*) as cnt, + count(distinct c) AS uv + FROM TABLE( + HOP(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE)) + GROUP BY a, window_start, window_end, window_time +) R +ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], window_start=[$1], window_end=[$2], window_time=[$3], cnt=[$4], uv=[$5], a0=[$6], window_start0=[$7], window_end0=[$8], window_time0=[$9], cnt0=[$10], uv0=[$11]) ++- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6))], joinType=[inner]) + :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT $4)]) + : +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], window_time=[$7], c=[$2]) + : +- LogicalTableFunctionScan(invocation=[HOP($4, DESCRIPTOR($4), 300000:INTERVAL MINUTE, 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(PROCTIME) window_time)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT $4)]) + +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], window_time=[$7], c=[$2]) + +- LogicalTableFunctionScan(invocation=[HOP($4, DESCRIPTOR($4), 300000:INTERVAL MINUTE, 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(PROCTIME) window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[a, window_start, window_end, PROCTIME_MATERIALIZE(window_time) AS window_time, cnt, uv, a0, window_start0, window_end0, PROCTIME_MATERIALIZE(window_time0) AS window_time0, cnt0, uv0]) ++- WindowJoin(leftWindow=[HOP(win_start=[window_start], win_end=[window_end], size=[10 min], slide=[5 min])], rightWindow=[HOP(win_start=[window_start], win_end=[window_end], size=[10 min], slide=[5 min])], joinType=[InnerJoin], where=[=(a, a0)], select=[a, window_start, window_end, window_time, cnt, uv, a0, window_start0, window_end0, window_time0, cnt0, uv0]) + :- Exchange(distribution=[hash[a]]) + : +- Calc(select=[a, window_start, window_end, window_time, cnt, uv]) + : +- WindowAggregate(groupBy=[a], window=[HOP(time_col=[proctime], size=[10 min], slide=[5 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS window_time]) + : +- Exchange(distribution=[hash[a]]) + : +- Calc(select=[a, c, proctime]) + : +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + : +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, window_start, window_end, window_time, cnt, uv]) + +- WindowAggregate(groupBy=[a], window=[HOP(time_col=[proctime], size=[10 min], slide=[5 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS window_time]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, c, proctime]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testOnTumbleWindowAggregate"> + <Resource name="sql"> + <![CDATA[ +SELECT L.*, R.* +FROM ( + SELECT + a, + window_start, + window_end, + window_time, + count(*) as cnt, + count(distinct c) AS uv + FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) + GROUP BY a, window_start, window_end, window_time +) L +JOIN ( + SELECT + a, + window_start, + window_end, + window_time, + count(*) as cnt, + count(distinct c) AS uv + FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) + GROUP BY a, window_start, window_end, window_time +) R +ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], window_start=[$1], window_end=[$2], window_time=[$3], cnt=[$4], uv=[$5], a0=[$6], window_start0=[$7], window_end0=[$8], window_time0=[$9], cnt0=[$10], uv0=[$11]) ++- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6))], joinType=[inner]) + :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT $4)]) + : +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], window_time=[$7], c=[$2]) + : +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($3), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT $4)]) + +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], window_time=[$7], c=[$2]) + +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($3), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], joinType=[InnerJoin], where=[=(a, a0)], select=[a, window_start, window_end, window_time, cnt, uv, a0, window_start0, window_end0, window_time0, cnt0, uv0]) +:- Exchange(distribution=[hash[a]]) +: +- Calc(select=[a, window_start, window_end, window_time, cnt, uv]) +: +- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[rowtime], size=[15 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time]) +: +- Exchange(distribution=[hash[a]]) +: +- Calc(select=[a, c, rowtime]) +: +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) +: +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) +: +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime]) ++- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, window_start, window_end, window_time, cnt, uv]) + +- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[rowtime], size=[15 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, c, rowtime]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testOnTumbleWindowAggregateOnProctime"> + <Resource name="sql"> + <![CDATA[ +SELECT L.*, R.* +FROM ( + SELECT + a, + window_start, + window_end, + window_time, + count(*) as cnt, + count(distinct c) AS uv + FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE)) + GROUP BY a, window_start, window_end, window_time +) L +JOIN ( + SELECT + a, + window_start, + window_end, + window_time, + count(*) as cnt, + count(distinct c) AS uv + FROM TABLE(TUMBLE(TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '15' MINUTE)) + GROUP BY a, window_start, window_end, window_time +) R +ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], window_start=[$1], window_end=[$2], window_time=[$3], cnt=[$4], uv=[$5], a0=[$6], window_start0=[$7], window_end0=[$8], window_time0=[$9], cnt0=[$10], uv0=[$11]) ++- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6))], joinType=[inner]) + :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT $4)]) + : +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], window_time=[$7], c=[$2]) + : +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($4), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(PROCTIME) window_time)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT $4)]) + +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], window_time=[$7], c=[$2]) + +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($4), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(PROCTIME) window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[a, window_start, window_end, PROCTIME_MATERIALIZE(window_time) AS window_time, cnt, uv, a0, window_start0, window_end0, PROCTIME_MATERIALIZE(window_time0) AS window_time0, cnt0, uv0]) ++- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], joinType=[InnerJoin], where=[=(a, a0)], select=[a, window_start, window_end, window_time, cnt, uv, a0, window_start0, window_end0, window_time0, cnt0, uv0]) + :- Exchange(distribution=[hash[a]]) + : +- Calc(select=[a, window_start, window_end, window_time, cnt, uv]) + : +- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[proctime], size=[15 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS window_time]) + : +- Exchange(distribution=[hash[a]]) + : +- Calc(select=[a, c, proctime]) + : +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + : +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, window_start, window_end, window_time, cnt, uv]) + +- WindowAggregate(groupBy=[a], window=[TUMBLE(time_col=[proctime], size=[15 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS window_time]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, c, proctime]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testTimeAttributePropagateForWindowJoin"> + <Resource name="sql"> + <![CDATA[ +SELECT tmp.*, MyTable3.* FROM tmp JOIN MyTable3 ON + tmp.a = MyTable3.a AND + tmp.rowtime BETWEEN + MyTable3.rowtime - INTERVAL '10' SECOND AND + MyTable3.rowtime + INTERVAL '1' HOUR +]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(rowtime=[$0], a=[$1], l_b=[$2], l_c=[$3], r_b=[$4], r_c=[$5], a0=[$6], b=[$7], c=[$8], rowtime0=[$9], proctime=[$10]) ++- LogicalJoin(condition=[AND(=($1, $6), >=($0, -($9, 10000:INTERVAL SECOND)), <=($0, +($9, 3600000:INTERVAL HOUR)))], joinType=[inner]) + :- LogicalProject(rowtime=[$7], a=[$0], l_b=[$1], l_c=[$2], r_b=[$9], r_c=[$10]) + : +- LogicalJoin(condition=[AND(=($5, $13), =($6, $14), =($0, $8))], joinType=[inner]) + : :- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], window_start=[$5], window_end=[$6], window_time=[$7]) + : : +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($3), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)]) + : : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + : : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + : : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4], window_start=[$5], window_end=[$6], window_time=[$7]) + : +- LogicalTableFunctionScan(invocation=[TUMBLE($4, DESCRIPTOR($3), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable3]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[rowtime, a, l_b, l_c, r_b, r_c, a0, b, c, rowtime0, PROCTIME_MATERIALIZE(proctime) AS proctime]) ++- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-10000, leftUpperBound=3600000, leftTimeIndex=0, rightTimeIndex=3], where=[AND(=(a, a0), >=(rowtime, -(rowtime0, 10000:INTERVAL SECOND)), <=(rowtime, +(rowtime0, 3600000:INTERVAL HOUR)))], select=[rowtime, a, l_b, l_c, r_b, r_c, a0, b, c, rowtime0, proctime]) + :- Exchange(distribution=[hash[a]]) + : +- Calc(select=[window_time AS rowtime, a, b AS l_b, c AS l_c, b0 AS r_b, c0 AS r_c]) + : +- WindowJoin(leftWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], rightWindow=[TUMBLE(win_start=[window_start], win_end=[window_end], size=[15 min])], joinType=[InnerJoin], where=[=(a, a0)], select=[a, b, c, window_start, window_end, window_time, a0, b0, c0, window_start0, window_end0]) + : :- Exchange(distribution=[hash[a]]) + : : +- Calc(select=[a, b, c, window_start, window_end, window_time]) + : : +- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 min])]) + : : +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + : : +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) + : : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime]) + : +- Exchange(distribution=[hash[a]]) + : +- Calc(select=[a, b, c, window_start, window_end]) + : +- WindowTableFunction(window=[TUMBLE(time_col=[rowtime], size=[15 min])]) + : +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + : +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, rowtime]) + +- Exchange(distribution=[hash[a]]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable3]], fields=[a, b, c, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testWindowJoinWithNonEqui"> + <Resource name="sql"> + <![CDATA[ +SELECT L.*, R.* +FROM ( + SELECT + a, + window_start, + window_end, + window_time, + count(*) as cnt, + count(distinct c) AS uv + FROM TABLE( + CUMULATE( + TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR)) + GROUP BY a, window_start, window_end, window_time +) L +JOIN ( + SELECT + a, + window_start, + window_end, + window_time, + count(*) as cnt, + count(distinct c) AS uv + FROM TABLE( + CUMULATE( + TABLE MyTable2, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR)) + GROUP BY a, window_start, window_end, window_time +) R +ON L.window_start = R.window_start AND L.window_end = R.window_end AND L.a = R.a AND + CAST(L.window_start AS BIGINT) > R.uv + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], window_start=[$1], window_end=[$2], window_time=[$3], cnt=[$4], uv=[$5], a0=[$6], window_start0=[$7], window_end0=[$8], window_time0=[$9], cnt0=[$10], uv0=[$11]) ++- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6), >(CAST($1):BIGINT NOT NULL, $11))], joinType=[inner]) + :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT $4)]) + : +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], window_time=[$7], c=[$2]) + : +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($4), 600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(PROCTIME) window_time)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT $4)]) + +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], window_time=[$7], c=[$2]) + +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($4), 600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(PROCTIME) window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[a, window_start, window_end, PROCTIME_MATERIALIZE(window_time) AS window_time, cnt, uv, a0, window_start0, window_end0, PROCTIME_MATERIALIZE(window_time0) AS window_time0, cnt0, uv0]) ++- WindowJoin(leftWindow=[CUMULATE(win_start=[window_start], win_end=[window_end], max_size=[1 h], step=[10 min])], rightWindow=[CUMULATE(win_start=[window_start], win_end=[window_end], max_size=[1 h], step=[10 min])], joinType=[InnerJoin], where=[AND(=(a, a0), >(CAST(window_start), uv0))], select=[a, window_start, window_end, window_time, cnt, uv, a0, window_start0, window_end0, window_time0, cnt0, uv0]) + :- Exchange(distribution=[hash[a]]) + : +- Calc(select=[a, window_start, window_end, window_time, cnt, uv]) + : +- WindowAggregate(groupBy=[a], window=[CUMULATE(time_col=[proctime], max_size=[1 h], step=[10 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS window_time]) + : +- Exchange(distribution=[hash[a]]) + : +- Calc(select=[a, c, proctime]) + : +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + : +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, rowtime]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, window_start, window_end, window_time, cnt, uv]) + +- WindowAggregate(groupBy=[a], window=[CUMULATE(time_col=[proctime], max_size=[1 h], step=[10 min])], select=[a, COUNT(*) AS cnt, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS window_end, proctime('w$) AS window_time]) + +- Exchange(distribution=[hash[a]]) + +- Calc(select=[a, c, proctime]) + +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) + +- Calc(select=[a, b, c, rowtime, PROCTIME() AS proctime]) + +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, rowtime]) +]]> + </Resource> + </TestCase> + <TestCase name="testWindowPropertyPropagateForWindowJoin"> + <Resource name="sql"> + <![CDATA[ +SELECT * FROM +( + SELECT *, + ROW_NUMBER() OVER( + PARTITION BY window_start, window_end ORDER BY l_cnt DESC) as rownum + FROM tmp2 +) +WHERE rownum <= 3 + ]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(window_start=[$0], window_end=[$1], a=[$2], l_cnt=[$3], l_uv=[$4], r_cnt=[$5], r_uv=[$6], rownum=[$7]) ++- LogicalFilter(condition=[<=($7, 3)]) + +- LogicalProject(window_start=[$1], window_end=[$2], a=[$0], l_cnt=[$4], l_uv=[$5], r_cnt=[$10], r_uv=[$11], rownum=[ROW_NUMBER() OVER (PARTITION BY $1, $2 ORDER BY $4 DESC NULLS LAST)]) + +- LogicalJoin(condition=[AND(=($1, $7), =($2, $8), =($0, $6))], joinType=[inner]) + :- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT $4)]) + : +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], window_time=[$7], c=[$2]) + : +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($3), 600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + : +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + : +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]]) + +- LogicalAggregate(group=[{0, 1, 2, 3}], cnt=[COUNT()], uv=[COUNT(DISTINCT $4)]) + +- LogicalProject(a=[$0], window_start=[$5], window_end=[$6], window_time=[$7], c=[$2]) + +- LogicalTableFunctionScan(invocation=[CUMULATE($4, DESCRIPTOR($3), 600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, VARCHAR(2147483647) b, BIGINT c, TIME ATTRIBUTE(ROWTIME) rowtime, TIME ATTRIBUTE(PROCTIME) proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIME ATTRIBUTE(ROWTIME) window_time)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[$4]) + +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($3, 1000:INTERVAL SECOND)]) + +- LogicalProject(a=[$0], b=[$1], c=[$2], rowtime=[$3], proctime=[PROCTIME()]) + +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]]) +]]> + </Resource> + <Resource name="optimized rel plan"> + <![CDATA[ +Calc(select=[window_start, window_end, a, cnt AS l_cnt, uv AS l_uv, cnt0 AS r_cnt, uv0 AS r_uv, rownum]) ++- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=3], partitionBy=[window_start, window_end], orderBy=[cnt DESC], select=[a, window_start, window_end, window_time, cnt, uv, a0, window_start0, window_end0, window_time0, cnt0, uv0, rownum]) Review comment: Good catch, I missed windowProperties inference for `FlinkLogicalJoin` in `FlinkRelMdWindowProperties`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
