danny0405 commented on a change in pull request #10123: 
[FLINK-14665][table-planner-blink] Support computed column for create…
URL: https://github.com/apache/flink/pull/10123#discussion_r344602758
 
 

 ##########
 File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MiniBatchIntervalInferTest.xml
 ##########
 @@ -221,76 +221,70 @@ Calc(select=[a, b])
 LogicalSink(name=[`default_catalog`.`default_database`.`appendSink1`], 
fields=[a, b])
 +- LogicalProject(id1=[$1], EXPR$1=[$2])
    +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
-      +- LogicalProject($f0=[HOP($2, 12000:INTERVAL SECOND, 4000:INTERVAL 
SECOND)], id1=[$0], text=[$1], $f3=[_UTF-16LE'*'])
-         +- LogicalProject(id1=[$1], text=[$2], ts=[TUMBLE_ROWTIME($0)])
-            +- LogicalAggregate(group=[{0, 1}], text=[LISTAGG($2, $3)])
-               +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], 
id1=[$0], text=[$2], $f3=[_UTF-16LE'#'])
-                  +- LogicalProject(id1=[$0], ts=[$1], text=[$2])
-                     +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 
300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
-                        +- LogicalJoin(condition=[true], joinType=[inner])
-                           :- LogicalWatermarkAssigner(fields=[id1, rowtime, 
text], rowtimeField=[rowtime], watermarkDelay=[0])
-                           :  +- LogicalTableScan(table=[[default_catalog, 
default_database, T1]])
-                           +- LogicalWatermarkAssigner(fields=[id2, rowtime, 
cnt, name, goods], rowtimeField=[rowtime], watermarkDelay=[0])
-                              +- LogicalTableScan(table=[[default_catalog, 
default_database, T2]])
+      +- LogicalProject($f0=[HOP(TUMBLE_ROWTIME($0), 12000:INTERVAL SECOND, 
4000:INTERVAL SECOND)], id1=[$1], text=[$2], $f3=[_UTF-16LE'*'])
+         +- LogicalAggregate(group=[{0, 1}], text=[LISTAGG($2, $3)])
+            +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], 
id1=[$0], text=[$2], $f3=[_UTF-16LE'#'])
+               +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 
300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
+                  +- LogicalJoin(condition=[true], joinType=[inner])
+                     :- LogicalWatermarkAssigner(fields=[id1, rowtime, text], 
rowtimeField=[rowtime], watermarkDelay=[0])
+                     :  +- LogicalTableScan(table=[[default_catalog, 
default_database, T1]])
+                     +- LogicalWatermarkAssigner(fields=[id2, rowtime, cnt, 
name, goods], rowtimeField=[rowtime], watermarkDelay=[0])
+                        +- LogicalTableScan(table=[[default_catalog, 
default_database, T2]])
 
 LogicalSink(name=[`default_catalog`.`default_database`.`appendSink2`], 
fields=[a, b])
 +- LogicalProject(id1=[$1], EXPR$1=[$2])
    +- LogicalAggregate(group=[{0, 1}], EXPR$1=[LISTAGG($2, $3)])
       +- LogicalProject($f0=[TUMBLE($1, 9000:INTERVAL SECOND)], id1=[$0], 
text=[$2], $f3=[_UTF-16LE'-'])
-         +- LogicalProject(id1=[$0], ts=[$1], text=[$2])
+         +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 
300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
+            +- LogicalJoin(condition=[true], joinType=[inner])
+               :- LogicalWatermarkAssigner(fields=[id1, rowtime, text], 
rowtimeField=[rowtime], watermarkDelay=[0])
+               :  +- LogicalTableScan(table=[[default_catalog, 
default_database, T1]])
+               +- LogicalWatermarkAssigner(fields=[id2, rowtime, cnt, name, 
goods], rowtimeField=[rowtime], watermarkDelay=[0])
+                  +- LogicalTableScan(table=[[default_catalog, 
default_database, T2]])
+
+LogicalSink(name=[`default_catalog`.`default_database`.`appendSink3`], 
fields=[a, b])
++- LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
+   +- LogicalProject(id1=[$1], text=[$2])
+      +- LogicalAggregate(group=[{0, 1}], text=[LISTAGG($2, $3)])
+         +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], id1=[$0], 
text=[$2], $f3=[_UTF-16LE'#'])
             +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 
300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
                +- LogicalJoin(condition=[true], joinType=[inner])
                   :- LogicalWatermarkAssigner(fields=[id1, rowtime, text], 
rowtimeField=[rowtime], watermarkDelay=[0])
                   :  +- LogicalTableScan(table=[[default_catalog, 
default_database, T1]])
                   +- LogicalWatermarkAssigner(fields=[id2, rowtime, cnt, name, 
goods], rowtimeField=[rowtime], watermarkDelay=[0])
                      +- LogicalTableScan(table=[[default_catalog, 
default_database, T2]])
 
-LogicalSink(name=[`default_catalog`.`default_database`.`appendSink3`], 
fields=[a, b])
-+- LogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)])
-   +- LogicalProject(id1=[$0], text=[$1])
-      +- LogicalProject(id1=[$1], text=[$2], ts=[TUMBLE_ROWTIME($0)])
-         +- LogicalAggregate(group=[{0, 1}], text=[LISTAGG($2, $3)])
-            +- LogicalProject($f0=[TUMBLE($1, 6000:INTERVAL SECOND)], 
id1=[$0], text=[$2], $f3=[_UTF-16LE'#'])
-               +- LogicalProject(id1=[$0], ts=[$1], text=[$2])
-                  +- LogicalFilter(condition=[AND(=($0, $3), >($1, -($4, 
300000:INTERVAL MINUTE)), <($1, +($4, 180000:INTERVAL MINUTE)))])
-                     +- LogicalJoin(condition=[true], joinType=[inner])
-                        :- LogicalWatermarkAssigner(fields=[id1, rowtime, 
text], rowtimeField=[rowtime], watermarkDelay=[0])
-                        :  +- LogicalTableScan(table=[[default_catalog, 
default_database, T1]])
-                        +- LogicalWatermarkAssigner(fields=[id2, rowtime, cnt, 
name, goods], rowtimeField=[rowtime], watermarkDelay=[0])
-                           +- LogicalTableScan(table=[[default_catalog, 
default_database, T2]])
-
 == Optimized Logical Plan ==
-Calc(select=[id1, rowtime AS ts, text], reuse_id=[1])
-+- WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=1, 
rightTimeIndex=1], where=[AND(=(id1, id2), >(CAST(rowtime), -(CAST(rowtime0), 
300000:INTERVAL MINUTE)), <(CAST(rowtime), +(CAST(rowtime0), 180000:INTERVAL 
MINUTE)))], select=[id1, rowtime, text, id2, rowtime0, cnt, name, goods])
-   :- Exchange(distribution=[hash[id1]])
-   :  +- WatermarkAssigner(fields=[id1, rowtime, text], 
rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
-   :     +- DataStreamScan(table=[[default_catalog, default_database, T1]], 
fields=[id1, rowtime, text])
-   +- Exchange(distribution=[hash[id2]])
-      +- WatermarkAssigner(fields=[id2, rowtime, cnt, name, goods], 
rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
-         +- DataStreamScan(table=[[default_catalog, default_database, T2]], 
fields=[id2, rowtime, cnt, name, goods])
-
-GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, ts, 
6000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[id1, 
LISTAGG(text, $f3) AS text, start('w$) AS w$start, end('w$) AS w$end, 
rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime], reuse_id=[2])
-+- Exchange(distribution=[hash[id1]])
-   +- Calc(select=[ts, id1, text, _UTF-16LE'#' AS $f3])
-      +- Reused(reference_id=[1])
+WindowJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, 
leftLowerBound=-299999, leftUpperBound=179999, leftTimeIndex=1, 
rightTimeIndex=1], where=[AND(=(id1, id2), >(CAST(rowtime), -(CAST(rowtime0), 
300000:INTERVAL MINUTE)), <(CAST(rowtime), +(CAST(rowtime0), 180000:INTERVAL 
MINUTE)))], select=[id1, rowtime, text, id2, rowtime0, cnt, name, goods], 
reuse_id=[1])
+:- Exchange(distribution=[hash[id1]])
+:  +- WatermarkAssigner(fields=[id1, rowtime, text], rowtimeField=[rowtime], 
watermarkDelay=[0], miniBatchInterval=[None])
+:     +- DataStreamScan(table=[[default_catalog, default_database, T1]], 
fields=[id1, rowtime, text])
++- Exchange(distribution=[hash[id2]])
+   +- WatermarkAssigner(fields=[id2, rowtime, cnt, name, goods], 
rowtimeField=[rowtime], watermarkDelay=[0], miniBatchInterval=[None])
+      +- DataStreamScan(table=[[default_catalog, default_database, T2]], 
fields=[id2, rowtime, cnt, name, goods])
+
+Exchange(distribution=[hash[id1]], reuse_id=[2])
++- Calc(select=[rowtime, id1, text, _UTF-16LE'#' AS $f3])
+   +- Reused(reference_id=[1])
 
 Sink(name=[`default_catalog`.`default_database`.`appendSink1`], fields=[a, b])
-+- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, ts, 
4000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
++- GroupWindowAggregate(groupBy=[id1], window=[SlidingGroupWindow('w$, $f0, 
4000, 12000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
    +- Exchange(distribution=[hash[id1]])
-      +- Calc(select=[w$rowtime AS ts, id1, text, _UTF-16LE'*' AS $f3])
-         +- Reused(reference_id=[2])
+      +- Calc(select=[w$rowtime AS $f0, id1, text, _UTF-16LE'*' AS $f3])
+         +- GroupWindowAggregate(groupBy=[id1], 
window=[TumblingGroupWindow('w$, rowtime, 6000)], properties=[w$start, w$end, 
w$rowtime, w$proctime], select=[id1, LISTAGG(text, $f3) AS text, start('w$) AS 
w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS 
w$proctime])
+            +- Reused(reference_id=[2])
 
 Sink(name=[`default_catalog`.`default_database`.`appendSink2`], fields=[a, b])
-+- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, ts, 
9000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
++- GroupWindowAggregate(groupBy=[id1], window=[TumblingGroupWindow('w$, 
rowtime, 9000)], select=[id1, LISTAGG(text, $f3) AS EXPR$1])
 
 Review comment:
   The name was changed because of CALCITE-2718.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to