danny0405 commented on a change in pull request #10123: [FLINK-14665][table-planner-blink] Support computed column for create… URL: https://github.com/apache/flink/pull/10123#discussion_r344602758
########## File path: flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml ########## @@ -221,76 +221,70 @@ Calc(select=[a, b]) LogicalSink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a, b]) +- LogicalProject(id1=[$1], EXPR$1=[$2]) +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)]) - +- LogicalProject($f0=[HOP($2, 12000:INTERVAL SECOND, 4000:INTERVAL SECOND)], id1=[$0], text=[$1], $f3=[_UTF-16LE'*']) - +- LogicalProject(id1=[$1], text=[$2], ts=[TUMBLE_ROWTIME($0)]) - +- LogicalAggregate(group=[{0, 1}], text=[LISTAGG($2, $3)]) - +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'#']) - +- LogicalProject(id1=[$0], ts=[$1], text=[$2]) - +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))]) - +- LogicalJoin(condition=[true], joinType=[inner]) - :- LogicalWatermarkAssigner(fields=[id1, rowtime, text], rowtimeField=[rowtime], watermarkDelay=[0]) - : +- LogicalTableScan(table=[[default_catalog, default_database, T1]]) - +- LogicalWatermarkAssigner(fields=[id2, rowtime, cnt, name, goods], rowtimeField=[rowtime], watermarkDelay=[0]) - +- LogicalTableScan(table=[[default_catalog, default_database, T2]]) + +- LogicalProject($f0=[HOP(TUMBLE_ROWTIME($0), 12000:INTERVAL SECOND, 4000:INTERVAL SECOND)], id1=[$1], text=[$2], $f3=[_UTF-16LE'*']) + +- LogicalAggregate(group=[{0, 1}], text=[LISTAGG($2, $3)]) + +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'#']) + +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalWatermarkAssigner(fields=[id1, rowtime, text], rowtimeField=[rowtime], watermarkDelay=[0]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T1]]) + +- LogicalWatermarkAssigner(fields=[id2, rowtime, cnt, name, goods], rowtimeField=[rowtime], watermarkDelay=[0]) + +- LogicalTableScan(table=[[default_catalog, default_database, T2]]) LogicalSink(name=[`default_catalog`.`default_database`.`appendSink2`], fields=[a, b]) +- LogicalProject(id1=[$1], EXPR$1=[$2]) +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)]) +- LogicalProject($f0=[TUMBLE($1, 9000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'-']) - +- LogicalProject(id1=[$0], ts=[$1], text=[$2]) + +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalWatermarkAssigner(fields=[id1, rowtime, text], rowtimeField=[rowtime], watermarkDelay=[0]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T1]]) + +- LogicalWatermarkAssigner(fields=[id2, rowtime, cnt, name, goods], rowtimeField=[rowtime], watermarkDelay=[0]) + +- LogicalTableScan(table=[[default_catalog, default_database, T2]]) + +LogicalSink(name=[`default_catalog`.`default_database`.`appendSink3`], fields=[a, b]) ++- LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)]) + +- LogicalProject(id1=[$1], text=[$2]) + +- LogicalAggregate(group=[{0, 1}], text=[LISTAGG($2, $3)]) + +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'#']) +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))]) +- LogicalJoin(condition=[true], joinType=[inner]) :- LogicalWatermarkAssigner(fields=[id1, rowtime, text], rowtimeField=[rowtime], watermarkDelay=[0]) : +- LogicalTableScan(table=[[default_catalog, default_database, T1]]) +- LogicalWatermarkAssigner(fields=[id2, rowtime, cnt, name, goods], rowtimeField=[rowtime], watermarkDelay=[0]) +- LogicalTableScan(table=[[default_catalog, default_database, T2]]) -LogicalSink(name=[`default_catalog`.`default_database`.`appendSink3`], fields=[a, b]) -+- LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)]) - +- LogicalProject(id1=[$0], text=[$1]) - +- LogicalProject(id1=[$1], text=[$2], ts=[TUMBLE_ROWTIME($0)]) - +- LogicalAggregate(group=[{0, 1}], text=[LISTAGG($2, $3)]) - +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], text=[$2], $f3=[_UTF-16LE'#']) - +- LogicalProject(id1=[$0], ts=[$1], text=[$2]) - +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))]) - +- LogicalJoin(condition=[true], joinType=[inner]) - :- LogicalWatermarkAssigner(fields=[id1, rowtime, text], rowtimeField=[rowtime], watermarkDelay=[0]) - : +- LogicalTableScan(table=[[default_catalog, default_database, T1]]) - +- LogicalWatermarkAssigner(fields=[id2, rowtime, cnt, name, goods], rowtimeField=[rowtime], watermarkDelay=[0]) - +- LogicalTableScan(table=[[default_catalog, default_database, T2]]) - == Optimized Logical Plan == -Calc(select=[id1, rowtime AS ts, text], reuse_id=[1]) -+- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=1, rightTimeIndex=1], where=[AND(=(id1, id2), >(CAST(rowtime), -(CAST(rowtime0), 300000:INTERVAL MINUTE)), <(CAST(rowtime), +(CAST(rowtime0), 180000:INTERVAL MINUTE)))], select=[id1, rowtime, text, id2, rowtime0, cnt, name, goods]) - :- Exchange(distribution=[hash[id1]]) - : +- WatermarkAssigner(fields=[id1, rowtime, text], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None]) - : +- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[id1, rowtime, text]) - +- Exchange(distribution=[hash[id2]]) - +- WatermarkAssigner(fields=[id2, rowtime, cnt, name, goods], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None]) - +- DataStreamScan(table=[[default_catalog, default_database, T2]], fields=[id2, rowtime, cnt, name, goods]) - -GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, ts, 6000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], reuse_id=[2]) -+- Exchange(distribution=[hash[id1]]) - +- Calc(select=[ts, id1, text, _UTF-16LE'#' AS $f3]) - +- Reused(reference_id=[1]) +WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=1, rightTimeIndex=1], where=[AND(=(id1, id2), >(CAST(rowtime), -(CAST(rowtime0), 300000:INTERVAL MINUTE)), <(CAST(rowtime), +(CAST(rowtime0), 180000:INTERVAL MINUTE)))], select=[id1, rowtime, text, id2, rowtime0, cnt, name, goods], reuse_id=[1]) +:- Exchange(distribution=[hash[id1]]) +: +- WatermarkAssigner(fields=[id1, rowtime, text], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None]) +: +- DataStreamScan(table=[[default_catalog, default_database, T1]], fields=[id1, rowtime, text]) ++- Exchange(distribution=[hash[id2]]) + +- WatermarkAssigner(fields=[id2, rowtime, cnt, name, goods], rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None]) + +- DataStreamScan(table=[[default_catalog, default_database, T2]], fields=[id2, rowtime, cnt, name, goods]) + +Exchange(distribution=[hash[id1]], reuse_id=[2]) ++- Calc(select=[rowtime, id1, text, _UTF-16LE'#' AS $f3]) + +- Reused(reference_id=[1]) Sink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a, b]) -+- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 4000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1]) ++- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, $f0, 4000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1]) +- Exchange(distribution=[hash[id1]]) - +- Calc(select=[w$rowtime AS ts, id1, text, _UTF-16LE'*' AS $f3]) - +- Reused(reference_id=[2]) + +- Calc(select=[w$rowtime AS $f0, id1, text, _UTF-16LE'*' AS $f3]) + +- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, rowtime, 6000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) + +- Reused(reference_id=[2]) Sink(name=[`default_catalog`.`default_database`.`appendSink2`], fields=[a, b]) -+- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, ts, 9000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1]) ++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, rowtime, 9000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1]) Review comment: The name was changed because of CALCITE-2718. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services