xuyangzhong commented on code in PR #24162:
URL: https://github.com/apache/flink/pull/24162#discussion_r1467143192
##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml:
##########
@@ -2170,6 +2360,386 @@ Sink(table=[default_catalog.default_database.sink],
fields=[ws, we, b, c])
+- Exchange(distribution=[hash[b]])
+- Calc(select=[b, CASE((a = 11), 1, null:INTEGER) AS $f3, PROCTIME()
AS proctime], where=[SEARCH(a, Sarg[1, 5, 7, 9, 11])])
+- TableSourceScan(table=[[default_catalog, default_database,
source, project=[a, b], metadata=[]]], fields=[a, b])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testSession_DistinctSplitEnabled[aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ a,
+ window_start,
+ window_end,
+ count(*),
+ sum(d),
+ max(d) filter (where b > 1000),
+ count(distinct c) AS uv
+FROM TABLE(
+ SESSION(TABLE MyTable PARTITION BY a, DESCRIPTOR(proctime), INTERVAL '5'
MINUTE))
+GROUP BY a, window_start, window_end
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)],
EXPR$5=[MAX($3) FILTER $4], uv=[COUNT(DISTINCT $5)])
++- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS
TRUE(>($1, 1000))], c=[$2])
+ +- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($0),
DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT
b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5],
proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, uv])
++- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[proctime], gap=[5
min], partition keys=[a])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4,
MAX(d) FILTER $f4 AS EXPR$5, COUNT(DISTINCT c) AS uv, start('w$) AS
window_start, end('w$) AS window_end])
+ +- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, c, proctime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
+ +- Calc(select=[a, b, c, d, PROCTIME() AS proctime, rowtime])
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable, project=[a, b, c, d, rowtime], metadata=[]]], fields=[a, b, c, d,
rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testSession_DistinctSplitEnabled[aggPhaseEnforcer=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ a,
+ window_start,
+ window_end,
+ count(*),
+ sum(d),
+ max(d) filter (where b > 1000),
+ count(distinct c) AS uv
+FROM TABLE(
+ SESSION(TABLE MyTable PARTITION BY a, DESCRIPTOR(proctime), INTERVAL '5'
MINUTE))
+GROUP BY a, window_start, window_end
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)],
EXPR$5=[MAX($3) FILTER $4], uv=[COUNT(DISTINCT $5)])
++- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS
TRUE(>($1, 1000))], c=[$2])
+ +- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($0),
DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT
b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5],
proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, uv])
++- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[proctime], gap=[5
min], partition keys=[a])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4,
MAX(d) FILTER $f4 AS EXPR$5, COUNT(DISTINCT c) AS uv, start('w$) AS
window_start, end('w$) AS window_end])
+ +- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, c, proctime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
+ +- Calc(select=[a, b, c, d, PROCTIME() AS proctime, rowtime])
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable, project=[a, b, c, d, rowtime], metadata=[]]], fields=[a, b, c, d,
rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSession_OnProctime[aggPhaseEnforcer=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ a,
+ window_start,
+ window_end,
+ count(*),
+ sum(d),
+ max(d) filter (where b > 1000),
+ weightedAvg(b, e) AS wAvg,
+ count(distinct c) AS uv
+FROM TABLE(
+ SESSION(TABLE MyTable PARTITION BY a, DESCRIPTOR(proctime), INTERVAL '5'
MINUTE))
+GROUP BY a, window_start, window_end
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)],
EXPR$5=[MAX($3) FILTER $4], wAvg=[weightedAvg($5, $6)], uv=[COUNT(DISTINCT $7)])
++- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS
TRUE(>($1, 1000))], b=[$1], e=[$4], c=[$2])
+ +- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($0),
DESCRIPTOR($6), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT
b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5],
proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, wAvg, uv])
++- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[proctime], gap=[5
min], partition keys=[a])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4,
MAX(d) FILTER $f4 AS EXPR$5, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS
uv, start('w$) AS window_start, end('w$) AS window_end])
+ +- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, b, e, c, proctime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
+ +- Calc(select=[a, b, c, d, e, rowtime, PROCTIME() AS proctime])
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSession_OnRowtime[aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ a,
+ window_start,
+ window_end,
+ count(*),
+ sum(d),
+ max(d) filter (where b > 1000),
+ weightedAvg(b, e) AS wAvg,
+ count(distinct c) AS uv
+FROM TABLE(
+ SESSION(TABLE MyTable PARTITION BY a, DESCRIPTOR(rowtime), INTERVAL '5'
MINUTE))
+GROUP BY a, window_start, window_end
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)],
EXPR$5=[MAX($3) FILTER $4], wAvg=[weightedAvg($5, $6)], uv=[COUNT(DISTINCT $7)])
++- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS
TRUE(>($1, 1000))], b=[$1], e=[$4], c=[$2])
+ +- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($0),
DESCRIPTOR($5), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT
b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5],
proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, wAvg, uv])
++- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[rowtime], gap=[5
min], partition keys=[a])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4,
MAX(d) FILTER $f4 AS EXPR$5, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS
uv, start('w$) AS window_start, end('w$) AS window_end])
+ +- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, b, e, c, rowtime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testSession_OnRowtime[aggPhaseEnforcer=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ a,
+ window_start,
+ window_end,
+ count(*),
+ sum(d),
+ max(d) filter (where b > 1000),
+ weightedAvg(b, e) AS wAvg,
+ count(distinct c) AS uv
+FROM TABLE(
+ SESSION(TABLE MyTable PARTITION BY a, DESCRIPTOR(rowtime), INTERVAL '5'
MINUTE))
+GROUP BY a, window_start, window_end
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)],
EXPR$5=[MAX($3) FILTER $4], wAvg=[weightedAvg($5, $6)], uv=[COUNT(DISTINCT $7)])
++- LogicalProject(a=[$0], window_start=[$7], window_end=[$8], d=[$3], $f4=[IS
TRUE(>($1, 1000))], b=[$1], e=[$4], c=[$2])
+ +- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($0),
DESCRIPTOR($5), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT
b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4], rowtime=[$5],
proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog, default_database,
MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[a, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, wAvg, uv])
++- WindowAggregate(groupBy=[a], window=[SESSION(time_col=[rowtime], gap=[5
min], partition keys=[a])], select=[a, COUNT(*) AS EXPR$3, SUM(d) AS EXPR$4,
MAX(d) FILTER $f4 AS EXPR$5, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT c) AS
uv, start('w$) AS window_start, end('w$) AS window_end])
+ +- Exchange(distribution=[hash[a]])
+ +- Calc(select=[a, d, IS TRUE(>(b, 1000)) AS $f4, b, e, c, rowtime])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
+ +- TableSourceScan(table=[[default_catalog, default_database,
MyTable]], fields=[a, b, c, d, e, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testSessionWindowTVFWithoutPartitionKeyWhenCantMerge[aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ window_start,
+ window_end,
+ count(*),
+ sum(d),
+ max(d) filter (where b > 1000),
+ weightedAvg(b, e) AS wAvg,
+ count(distinct c) AS uv
+FROM (
+ SELECT window_start, rowtime, d, proctime, e, b, c, window_end, window_time,
a
+ FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE))
+ WHERE window_start >= TIMESTAMP '2021-01-01 10:10:00.000'
+)
+GROUP BY window_start, window_end
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT()], EXPR$3=[SUM($2)],
EXPR$4=[MAX($2) FILTER $3], wAvg=[weightedAvg($4, $5)], uv=[COUNT(DISTINCT $6)])
++- LogicalProject(window_start=[$0], window_end=[$7], d=[$2], $f3=[IS
TRUE(>($5, 1000))], b=[$5], e=[$4], c=[$6])
+ +- LogicalProject(window_start=[$7], rowtime=[$5], d=[$3], proctime=[$6],
e=[$4], b=[$1], c=[$2], window_end=[$8], window_time=[$9], a=[$0])
+ +- LogicalFilter(condition=[>=($7, 2021-01-01 10:10:00)])
+ +- LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($5),
300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b,
VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[window_start, window_end, EXPR$2, EXPR$3, EXPR$4, wAvg, uv])
++- WindowAggregate(window=[SESSION(win_start=[window_start],
win_end=[window_end], gap=[5 min])], select=[COUNT(*) AS EXPR$2, SUM(d) AS
EXPR$3, MAX(d) FILTER $f3 AS EXPR$4, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT
c) AS uv, start('w$) AS window_start, end('w$) AS window_end])
+ +- Exchange(distribution=[single])
+ +- Calc(select=[window_start, window_end, d, IS TRUE(>(b, 1000)) AS $f3,
b, e, c], where=[>=(window_start, 2021-01-01 10:10:00)])
+ +- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5
min])])
+ +- Exchange(distribution=[single])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
+ +- TableSourceScan(table=[[default_catalog,
default_database, MyTable, project=[b, c, d, e, rowtime], metadata=[]]],
fields=[b, c, d, e, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testSessionWindowTVFWithoutPartitionKeyWhenCantMerge[aggPhaseEnforcer=TWO_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ window_start,
+ window_end,
+ count(*),
+ sum(d),
+ max(d) filter (where b > 1000),
+ weightedAvg(b, e) AS wAvg,
+ count(distinct c) AS uv
+FROM (
+ SELECT window_start, rowtime, d, proctime, e, b, c, window_end, window_time,
a
+ FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE))
+ WHERE window_start >= TIMESTAMP '2021-01-01 10:10:00.000'
+)
+GROUP BY window_start, window_end
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalAggregate(group=[{0, 1}], EXPR$2=[COUNT()], EXPR$3=[SUM($2)],
EXPR$4=[MAX($2) FILTER $3], wAvg=[weightedAvg($4, $5)], uv=[COUNT(DISTINCT $6)])
++- LogicalProject(window_start=[$0], window_end=[$7], d=[$2], $f3=[IS
TRUE(>($5, 1000))], b=[$5], e=[$4], c=[$6])
+ +- LogicalProject(window_start=[$7], rowtime=[$5], d=[$3], proctime=[$6],
e=[$4], b=[$1], c=[$2], window_end=[$8], window_time=[$9], a=[$0])
+ +- LogicalFilter(condition=[>=($7, 2021-01-01 10:10:00)])
+ +- LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($5),
300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b,
VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($5,
1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[window_start, window_end, EXPR$2, EXPR$3, EXPR$4, wAvg, uv])
++- WindowAggregate(window=[SESSION(win_start=[window_start],
win_end=[window_end], gap=[5 min])], select=[COUNT(*) AS EXPR$2, SUM(d) AS
EXPR$3, MAX(d) FILTER $f3 AS EXPR$4, weightedAvg(b, e) AS wAvg, COUNT(DISTINCT
c) AS uv, start('w$) AS window_start, end('w$) AS window_end])
+ +- Exchange(distribution=[single])
+ +- Calc(select=[window_start, window_end, d, IS TRUE(>(b, 1000)) AS $f3,
b, e, c], where=[>=(window_start, 2021-01-01 10:10:00)])
+ +- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[5
min])])
+ +- Exchange(distribution=[single])
+ +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
1000:INTERVAL SECOND)])
+ +- TableSourceScan(table=[[default_catalog,
default_database, MyTable, project=[b, c, d, e, rowtime], metadata=[]]],
fields=[b, c, d, e, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase
name="testSessionWindowTVFWithPartitionKeyWhenCantMerge[aggPhaseEnforcer=ONE_PHASE]">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ window_start,
+ window_end,
+ a,
+ count(*),
+ sum(d),
+ max(d) filter (where b > 1000),
+ weightedAvg(b, e) AS wAvg,
+ count(distinct c) AS uv
+FROM (
+ SELECT window_start, rowtime, d, proctime, e, b, c, window_end, window_time,
a
+ FROM TABLE(SESSION(TABLE MyTable PARTITION BY a, DESCRIPTOR(rowtime),
INTERVAL '5' MINUTE))
+ WHERE window_start >= TIMESTAMP '2021-01-01 10:10:00.000'
+)
+GROUP BY a, window_start, window_end
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(window_start=[$1], window_end=[$2], a=[$0], EXPR$3=[$3],
EXPR$4=[$4], EXPR$5=[$5], wAvg=[$6], uv=[$7])
++- LogicalAggregate(group=[{0, 1, 2}], EXPR$3=[COUNT()], EXPR$4=[SUM($3)],
EXPR$5=[MAX($3) FILTER $4], wAvg=[weightedAvg($5, $6)], uv=[COUNT(DISTINCT $7)])
+ +- LogicalProject(a=[$9], window_start=[$0], window_end=[$7], d=[$2],
$f4=[IS TRUE(>($5, 1000))], b=[$5], e=[$4], c=[$6])
+ +- LogicalProject(window_start=[$7], rowtime=[$5], d=[$3],
proctime=[$6], e=[$4], b=[$1], c=[$2], window_end=[$8], window_time=[$9],
a=[$0])
+ +- LogicalFilter(condition=[>=($7, 2021-01-01 10:10:00)])
+ +- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($0),
DESCRIPTOR($5), 300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT
b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, BIGINT e, TIMESTAMP(3) *ROWTIME*
rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[$6])
+ +- LogicalWatermarkAssigner(rowtime=[rowtime],
watermark=[-($5, 1000:INTERVAL SECOND)])
+ +- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], e=[$4],
rowtime=[$5], proctime=[PROCTIME()])
+ +- LogicalTableScan(table=[[default_catalog,
default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Calc(select=[window_start, window_end, a, EXPR$3, EXPR$4, EXPR$5, wAvg, uv])
++- WindowAggregate(groupBy=[a], window=[SESSION(win_start=[window_start],
win_end=[window_end], gap=[5 min], partition keys=[a])], select=[a, COUNT(*) AS
EXPR$3, SUM(d) AS EXPR$4, MAX(d) FILTER $f4 AS EXPR$5, weightedAvg(b, e) AS
wAvg, COUNT(DISTINCT c) AS uv, start('w$) AS window_start, end('w$) AS
window_end])
+ +- Exchange(distribution=[hash[a]])
Review Comment:
Sure. https://issues.apache.org/jira/browse/FLINK-34238
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]