xuyangzhong commented on code in PR #24162:
URL: https://github.com/apache/flink/pull/24162#discussion_r1464769669
##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml:
##########
@@ -2170,6 +2360,386 @@ Sink(table=[default_catalog.default_database.sink],
fields=[ws, we, b, c])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[b, CASE((a = 11), 1, null:INTEGER) AS $f3, PROCTIME()
AS proctime], where=[SEARCH(a, Sarg[1, 5, 7, 9, 11])])
+- TableSourceScan(table=[[default_catalog, default_database,
source, project=[a, b], metadata=[]]], fields=[a, b])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testSession_DistinctSplitEnabled[aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ a,
+ window_start,
+ window_end,
+ count(*),
+ sum(d),
+ max(d) filter (where b > 1000),
+ count(distinct c) AS uv
+FROM TABLE(
+ SESSION(TABLE MyTable PARTITION BY a, DESCRIPTOR(proctime), INTERVAL '5'
MINUTE))
+GROUP BY a, window_start, window_end
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)],
EXPR$5=[MAX($3) FILTER $4], uv=[COUNT(DISTINCT $5)])
++- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS
TRUE(>($1, 1000))], c=[$2])
+ +- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($0),
DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT
b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5],
proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, uv])
++- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[proctime], gap=[5
min], partition keys=[a])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4,
MAX(d) FILTER $f4 AS EXPR$5, COUNT(DISTINCT c) AS uv, start('w$) AS
window_start, end('w$) AS window_end])
+ +- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, c, proctime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
+ +- Calc(select=[a, b, c, d, PROCTIME() AS proctime, rowtime])
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable, project=[a, b, c, d, rowtime], metadata=[]]], fields=[a, b, c, d,
rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testSession_DistinctSplitEnabled[aggPhaseEnforcer=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ a,
+ window_start,
+ window_end,
+ count(*),
+ sum(d),
+ max(d) filter (where b > 1000),
+ count(distinct c) AS uv
+FROM TABLE(
+ SESSION(TABLE MyTable PARTITION BY a, DESCRIPTOR(proctime), INTERVAL '5'
MINUTE))
+GROUP BY a, window_start, window_end
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)],
EXPR$5=[MAX($3) FILTER $4], uv=[COUNT(DISTINCT $5)])
++- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS
TRUE(>($1, 1000))], c=[$2])
+ +- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($0),
DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT
b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5],
proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, uv])
++- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[proctime], gap=[5
min], partition keys=[a])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4,
MAX(d) FILTER $f4 AS EXPR$5, COUNT(DISTINCT c) AS uv, start('w$) AS
window_start, end('w$) AS window_end])
+ +- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, c, proctime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
+ +- Calc(select=[a, b, c, d, PROCTIME() AS proctime, rowtime])
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable, project=[a, b, c, d, rowtime], metadata=[]]], fields=[a, b, c, d,
rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSession_OnProctime[aggPhaseEnforcer=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ a,
+ window_start,
+ window_end,
+ count(*),
+ sum(d),
+ max(d) filter (where b > 1000),
+ weightedAvg(b, e) AS wAvg,
+ count(distinct c) AS uv
+FROM TABLE(
+ SESSION(TABLE MyTable PARTITION BY a, DESCRIPTOR(proctime), INTERVAL '5'
MINUTE))
+GROUP BY a, window_start, window_end
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)],
EXPR$5=[MAX($3) FILTER $4], wAvg=[weightedAvg($5, $6)], uv=[COUNT(DISTINCT $7)])
++- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS
TRUE(>($1, 1000))], b=[$1], e=[$4], c=[$2])
+ +- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($0),
DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT
b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5],
proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, wAvg, uv])
++- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[proctime], gap=[5
min], partition keys=[a])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4,
MAX(d) FILTER $f4 AS EXPR$5, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS
uv, start('w$) AS window_start, end('w$) AS window_end])
+ +- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, b, e, c, proctime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
+ +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime])
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSession_OnRowtime[aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ a,
+ window_start,
+ window_end,
+ count(*),
+ sum(d),
+ max(d) filter (where b > 1000),
+ weightedAvg(b, e) AS wAvg,
+ count(distinct c) AS uv
+FROM TABLE(
+ SESSION(TABLE MyTable PARTITION BY a, DESCRIPTOR(rowtime), INTERVAL '5'
MINUTE))
+GROUP BY a, window_start, window_end
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)],
EXPR$5=[MAX($3) FILTER $4], wAvg=[weightedAvg($5, $6)], uv=[COUNT(DISTINCT $7)])
++- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS
TRUE(>($1, 1000))], b=[$1], e=[$4], c=[$2])
+ +- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($0),
DESCRIPTOR($5), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT
b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5],
proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, wAvg, uv])
++- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[rowtime], gap=[5
min], partition keys=[a])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4,
MAX(d) FILTER $f4 AS EXPR$5, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS
uv, start('w$) AS window_start, end('w$) AS window_end])
+ +- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, b, e, c, rowtime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSession_OnRowtime[aggPhaseEnforcer=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ a,
+ window_start,
+ window_end,
+ count(*),
+ sum(d),
+ max(d) filter (where b > 1000),
+ weightedAvg(b, e) AS wAvg,
+ count(distinct c) AS uv
+FROM TABLE(
+ SESSION(TABLE MyTable PARTITION BY a, DESCRIPTOR(rowtime), INTERVAL '5'
MINUTE))
+GROUP BY a, window_start, window_end
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)],
EXPR$5=[MAX($3) FILTER $4], wAvg=[weightedAvg($5, $6)], uv=[COUNT(DISTINCT $7)])
++- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS
TRUE(>($1, 1000))], b=[$1], e=[$4], c=[$2])
+ +- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($0),
DESCRIPTOR($5), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT
b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5],
proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, wAvg, uv])
++- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[rowtime], gap=[5
min], partition keys=[a])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4,
MAX(d) FILTER $f4 AS EXPR$5, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS
uv, start('w$) AS window_start, end('w$) AS window_end])
+ +- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, b, e, c, rowtime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testSessionWindowTVFWithoutPartitionKeyWhenCantMerge[aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ window_start,
+ window_end,
+ count(*),
+ sum(d),
+ max(d) filter (where b > 1000),
+ weightedAvg(b, e) AS wAvg,
+ count(distinct c) AS uv
+FROM (
+ SELECT window_start, rowtime, d, proctime, e, b, c, window_end, window_time,
a
+ FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE))
+ WHERE window_start >= TIMESTAMP '2021-01-01 10:10:00.000'
+)
+GROUP BY window_start, window_end
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT()], EXPR$3=[SUM($2)],
EXPR$4=[MAX($2) FILTER $3], wAvg=[weightedAvg($4, $5)], uv=[COUNT(DISTINCT $6)])
++- LogicalProject(window_start=[$0], window_end=[$7], d=[$2], $f3=[IS
TRUE(>($5, 1000))], b=[$5], e=[$4], c=[$6])
+ +- LogicalProject(window_start=[$7], rowtime=[$5], d=[$3], proctime=[$6],
e=[$4], b=[$1], c=[$2], window_end=[$8], window_time=[$9], a=[$0])
+ +- LogicalFilter(condition=[>=($7, 2021-01-01 10:10:00)])
+ +- LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($5),
300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b,
VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[window_start, window_end, EXPR$2, EXPR$3, EXPR$4, wAvg, uv])
++- WindowAggregate(window=[SESSION(win_start=[window_start],
win_end=[window_end], gap=[5 min])], select=[COUNT(*) AS EXPR$2, SUM(d) AS
EXPR$3, MAX(d) FILTER $f3 AS EXPR$4, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT
c) AS uv, start('w$) AS window_start, end('w$) AS window_end])
+ +- Exchange(distribution=[single])
+ +- Calc(select=[window_start, window_end, d, IS TRUE(>(b, 1000)) AS $f3,
b, e, c], where=[>=(window_start, 2021-01-01 10:10:00)])
+ +- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5
min])])
+ +- Exchange(distribution=[single])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
+ +- TableSourceScan(table=[[default_catalog,
default_database, MyTable, project=[b, c, d, e, rowtime], metadata=[]]],
fields=[b, c, d, e, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testSessionWindowTVFWithoutPartitionKeyWhenCantMerge[aggPhaseEnforcer=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ window_start,
+ window_end,
+ count(*),
+ sum(d),
+ max(d) filter (where b > 1000),
+ weightedAvg(b, e) AS wAvg,
+ count(distinct c) AS uv
+FROM (
+ SELECT window_start, rowtime, d, proctime, e, b, c, window_end, window_time,
a
+ FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE))
+ WHERE window_start >= TIMESTAMP '2021-01-01 10:10:00.000'
+)
+GROUP BY window_start, window_end
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT()], EXPR$3=[SUM($2)],
EXPR$4=[MAX($2) FILTER $3], wAvg=[weightedAvg($4, $5)], uv=[COUNT(DISTINCT $6)])
++- LogicalProject(window_start=[$0], window_end=[$7], d=[$2], $f3=[IS
TRUE(>($5, 1000))], b=[$5], e=[$4], c=[$6])
+ +- LogicalProject(window_start=[$7], rowtime=[$5], d=[$3], proctime=[$6],
e=[$4], b=[$1], c=[$2], window_end=[$8], window_time=[$9], a=[$0])
+ +- LogicalFilter(condition=[>=($7, 2021-01-01 10:10:00)])
+ +- LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($5),
300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b,
VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[window_start, window_end, EXPR$2, EXPR$3, EXPR$4, wAvg, uv])
++- WindowAggregate(window=[SESSION(win_start=[window_start],
win_end=[window_end], gap=[5 min])], select=[COUNT(*) AS EXPR$2, SUM(d) AS
EXPR$3, MAX(d) FILTER $f3 AS EXPR$4, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT
c) AS uv, start('w$) AS window_start, end('w$) AS window_end])
+ +- Exchange(distribution=[single])
+ +- Calc(select=[window_start, window_end, d, IS TRUE(>(b, 1000)) AS $f3,
b, e, c], where=[>=(window_start, 2021-01-01 10:10:00)])
+ +- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5
min])])
+ +- Exchange(distribution=[single])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
+ +- TableSourceScan(table=[[default_catalog,
default_database, MyTable, project=[b, c, d, e, rowtime], metadata=[]]],
fields=[b, c, d, e, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testSessionWindowTVFWithPartitionKeyWhenCantMerge[aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ window_start,
+ window_end,
+ a,
+ count(*),
+ sum(d),
+ max(d) filter (where b > 1000),
+ weightedAvg(b, e) AS wAvg,
+ count(distinct c) AS uv
+FROM (
+ SELECT window_start, rowtime, d, proctime, e, b, c, window_end, window_time,
a
+ FROM TABLE(SESSION(TABLE MyTable PARTITION BY a, DESCRIPTOR(rowtime),
INTERVAL '5' MINUTE))
+ WHERE window_start >= TIMESTAMP '2021-01-01 10:10:00.000'
+)
+GROUP BY a, window_start, window_end
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(window_start=[$1], window_end=[$2], a=[$0], EXPR$3=[$3],
EXPR$4=[$4], EXPR$5=[$5], wAvg=[$6], uv=[$7])
++- LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)],
EXPR$5=[MAX($3) FILTER $4], wAvg=[weightedAvg($5, $6)], uv=[COUNT(DISTINCT $7)])
+ +- LogicalProject(a=[$9], window_start=[$0], window_end=[$7], d=[$2],
$f4=[IS TRUE(>($5, 1000))], b=[$5], e=[$4], c=[$6])
+ +- LogicalProject(window_start=[$7], rowtime=[$5], d=[$3],
proctime=[$6], e=[$4], b=[$1], c=[$2], window_end=[$8], window_time=[$9],
a=[$0])
+ +- LogicalFilter(condition=[>=($7, 2021-01-01 10:10:00)])
+ +- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($0),
DESCRIPTOR($5), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT
b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime],
watermark=[-($5, 1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[window_start, window_end, a, EXPR$3, EXPR$4, EXPR$5, wAvg, uv])
++- WindowAggregate(groupBy=[a], window=[SESSION(win_start=[window_start],
win_end=[window_end], gap=[5 min], partition keys=[a])], select=[a, COUNT(*) AS
EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS EXPR$5, weightedAvg(b, e) AS
wAvg, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS
window_end])
+ +- Exchange(distribution=[hash[a]])
Review Comment:
This requires strong reliance on whether their parallelism is consistent,
right?
In some scenarios, this exchange can be omitted. But how about talking about
this as an optimization separate from this PR?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]