danny0405 commented on a change in pull request #10123:
[FLINK-14665][table-planner-blink] Support computed column for create…
URL: https://github.com/apache/flink/pull/10123#discussion_r344602758
##########
File path:
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
##########
@@ -221,76 +221,70 @@ Calc(select=[a, b])
LogicalSink(name=[`default_catalog`.`default_database`.`appendSink1`],
fields=[a, b])
+- LogicalProject(id1=[$1], EXPR$1=[$2])
+- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
- +- LogicalProject($f0=[HOP($2, 12000:INTERVAL SECOND, 4000:INTERVAL
SECOND)], id1=[$0], text=[$1], $f3=[_UTF-16LE'*'])
- +- LogicalProject(id1=[$1], text=[$2], ts=[TUMBLE_ROWTIME($0)])
- +- LogicalAggregate(group=[{0, 1}], text=[LISTAGG($2, $3)])
- +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)],
id1=[$0], text=[$2], $f3=[_UTF-16LE'#'])
- +- LogicalProject(id1=[$0], ts=[$1], text=[$2])
- +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4,
300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
- +- LogicalJoin(condition=[true], joinType=[inner])
- :- LogicalWatermarkAssigner(fields=[id1, rowtime,
text], rowtimeField=[rowtime], watermarkDelay=[0])
- : +- LogicalTableScan(table=[[default_catalog,
default_database, T1]])
- +- LogicalWatermarkAssigner(fields=[id2, rowtime,
cnt, name, goods], rowtimeField=[rowtime], watermarkDelay=[0])
- +- LogicalTableScan(table=[[default_catalog,
default_database, T2]])
+ +- LogicalProject($f0=[HOP(TUMBLE_ROWTIME($0), 12000:INTERVAL SECOND,
4000:INTERVAL SECOND)], id1=[$1], text=[$2], $f3=[_UTF-16LE'*'])
+ +- LogicalAggregate(group=[{0, 1}], text=[LISTAGG($2, $3)])
+ +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)],
id1=[$0], text=[$2], $f3=[_UTF-16LE'#'])
+ +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4,
300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalWatermarkAssigner(fields=[id1, rowtime, text],
rowtimeField=[rowtime], watermarkDelay=[0])
+ : +- LogicalTableScan(table=[[default_catalog,
default_database, T1]])
+ +- LogicalWatermarkAssigner(fields=[id2, rowtime, cnt,
name, goods], rowtimeField=[rowtime], watermarkDelay=[0])
+ +- LogicalTableScan(table=[[default_catalog,
default_database, T2]])
LogicalSink(name=[`default_catalog`.`default_database`.`appendSink2`],
fields=[a, b])
+- LogicalProject(id1=[$1], EXPR$1=[$2])
+- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
+- LogicalProject($f0=[TUMBLE($1, 9000:INTERVAL SECOND)], id1=[$0],
text=[$2], $f3=[_UTF-16LE'-'])
- +- LogicalProject(id1=[$0], ts=[$1], text=[$2])
+ +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4,
300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalWatermarkAssigner(fields=[id1, rowtime, text],
rowtimeField=[rowtime], watermarkDelay=[0])
+ : +- LogicalTableScan(table=[[default_catalog,
default_database, T1]])
+ +- LogicalWatermarkAssigner(fields=[id2, rowtime, cnt, name,
goods], rowtimeField=[rowtime], watermarkDelay=[0])
+ +- LogicalTableScan(table=[[default_catalog,
default_database, T2]])
+
+LogicalSink(name=[`default_catalog`.`default_database`.`appendSink3`],
fields=[a, b])
++- LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
+ +- LogicalProject(id1=[$1], text=[$2])
+ +- LogicalAggregate(group=[{0, 1}], text=[LISTAGG($2, $3)])
+ +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0],
text=[$2], $f3=[_UTF-16LE'#'])
+- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4,
300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
+- LogicalJoin(condition=[true], joinType=[inner])
:- LogicalWatermarkAssigner(fields=[id1, rowtime, text],
rowtimeField=[rowtime], watermarkDelay=[0])
: +- LogicalTableScan(table=[[default_catalog,
default_database, T1]])
+- LogicalWatermarkAssigner(fields=[id2, rowtime, cnt, name,
goods], rowtimeField=[rowtime], watermarkDelay=[0])
+- LogicalTableScan(table=[[default_catalog,
default_database, T2]])
-LogicalSink(name=[`default_catalog`.`default_database`.`appendSink3`],
fields=[a, b])
-+- LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
- +- LogicalProject(id1=[$0], text=[$1])
- +- LogicalProject(id1=[$1], text=[$2], ts=[TUMBLE_ROWTIME($0)])
- +- LogicalAggregate(group=[{0, 1}], text=[LISTAGG($2, $3)])
- +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)],
id1=[$0], text=[$2], $f3=[_UTF-16LE'#'])
- +- LogicalProject(id1=[$0], ts=[$1], text=[$2])
- +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4,
300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
- +- LogicalJoin(condition=[true], joinType=[inner])
- :- LogicalWatermarkAssigner(fields=[id1, rowtime,
text], rowtimeField=[rowtime], watermarkDelay=[0])
- : +- LogicalTableScan(table=[[default_catalog,
default_database, T1]])
- +- LogicalWatermarkAssigner(fields=[id2, rowtime, cnt,
name, goods], rowtimeField=[rowtime], watermarkDelay=[0])
- +- LogicalTableScan(table=[[default_catalog,
default_database, T2]])
-
== Optimized Logical Plan ==
-Calc(select=[id1, rowtime AS ts, text], reuse_id=[1])
-+- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=1,
rightTimeIndex=1], where=[AND(=(id1, id2), >(CAST(rowtime), -(CAST(rowtime0),
300000:INTERVAL MINUTE)), <(CAST(rowtime), +(CAST(rowtime0), 180000:INTERVAL
MINUTE)))], select=[id1, rowtime, text, id2, rowtime0, cnt, name, goods])
- :- Exchange(distribution=[hash[id1]])
- : +- WatermarkAssigner(fields=[id1, rowtime, text],
rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
- : +- DataStreamScan(table=[[default_catalog, default_database, T1]],
fields=[id1, rowtime, text])
- +- Exchange(distribution=[hash[id2]])
- +- WatermarkAssigner(fields=[id2, rowtime, cnt, name, goods],
rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
- +- DataStreamScan(table=[[default_catalog, default_database, T2]],
fields=[id2, rowtime, cnt, name, goods])
-
-GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, ts,
6000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1,
LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end,
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], reuse_id=[2])
-+- Exchange(distribution=[hash[id1]])
- +- Calc(select=[ts, id1, text, _UTF-16LE'#' AS $f3])
- +- Reused(reference_id=[1])
+WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=1,
rightTimeIndex=1], where=[AND(=(id1, id2), >(CAST(rowtime), -(CAST(rowtime0),
300000:INTERVAL MINUTE)), <(CAST(rowtime), +(CAST(rowtime0), 180000:INTERVAL
MINUTE)))], select=[id1, rowtime, text, id2, rowtime0, cnt, name, goods],
reuse_id=[1])
+:- Exchange(distribution=[hash[id1]])
+: +- WatermarkAssigner(fields=[id1, rowtime, text], rowtimeField=[rowtime],
watermarkDelay=[0], miniBatchInterval=[None])
+: +- DataStreamScan(table=[[default_catalog, default_database, T1]],
fields=[id1, rowtime, text])
++- Exchange(distribution=[hash[id2]])
+ +- WatermarkAssigner(fields=[id2, rowtime, cnt, name, goods],
rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
+ +- DataStreamScan(table=[[default_catalog, default_database, T2]],
fields=[id2, rowtime, cnt, name, goods])
+
+Exchange(distribution=[hash[id1]], reuse_id=[2])
++- Calc(select=[rowtime, id1, text, _UTF-16LE'#' AS $f3])
+ +- Reused(reference_id=[1])
Sink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a, b])
-+- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts,
4000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
++- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, $f0,
4000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
+- Exchange(distribution=[hash[id1]])
- +- Calc(select=[w$rowtime AS ts, id1, text, _UTF-16LE'*' AS $f3])
- +- Reused(reference_id=[2])
+ +- Calc(select=[w$rowtime AS $f0, id1, text, _UTF-16LE'*' AS $f3])
+ +- GroupWindowAggregate(groupBy=[id1],
window=[TumblingGroupWindow('w$, rowtime, 6000)], properties=[w$start, w$end,
w$rowtime, w$proctime], select=[id1, LISTAGG(text, $f3) AS text, start('w$) AS
w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS
w$proctime])
+ +- Reused(reference_id=[2])
Sink(name=[`default_catalog`.`default_database`.`appendSink2`], fields=[a, b])
-+- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, ts,
9000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$,
rowtime, 9000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
Review comment:
The name was changed because of CALCITE-2718.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services