snuyanzin commented on code in PR #22558:
URL: https://github.com/apache/flink/pull/22558#discussion_r1239034473


##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.xml:
##########
@@ -379,30 +382,30 @@ 
LogicalLegacySink(name=[`default_catalog`.`default_database`.`retractSink2`], fi
       <![CDATA[
 LegacySink(name=[`default_catalog`.`default_database`.`retractSink1`], 
fields=[a, total_c], changelogMode=[NONE])
 +- Union(all=[true], union=[a, total_c], changelogMode=[I,UB,UA])
-   :- Calc(select=[a, total_c], where=[>(a, 50)], changelogMode=[I,UB,UA])
-   :  +- Calc(select=[a, total_c], changelogMode=[I,UB,UA])
-   :     +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(DISTINCT c) AS 
total_c], changelogMode=[I,UB,UA])
-   :        +- Exchange(distribution=[hash[a, b]], changelogMode=[I])
-   :           +- Calc(select=[a, b, f0 AS c], changelogMode=[I])
-   :              +- Correlate(invocation=[split($cor0.c)], 
correlate=[table(split($cor0.c))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)], joinType=[INNER], changelogMode=[I])
-   :                 +- Calc(select=[a, -(b, MOD(b, 300)) AS b, c], 
where=[AND(>=(b, UNIX_TIMESTAMP('${startTime}')), <>(c, ''))], 
changelogMode=[I])
+   :- Calc(select=[a, total_c], changelogMode=[I,UB,UA])
+   :  +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(DISTINCT f0) AS 
total_c], changelogMode=[I,UB,UA])
+   :     +- Exchange(distribution=[hash[a, b]], changelogMode=[I])
+   :        +- Calc(select=[a, b, f0], where=[>(a, 50)], changelogMode=[I])
+   :           +- Correlate(invocation=[split($cor0.c)], 
correlate=[table(split($cor0.c))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)], joinType=[INNER], changelogMode=[I])
+   :              +- Calc(select=[a, -(b, MOD(b, 300)) AS b, c], where=[<>(c, 
'')], changelogMode=[I])
+   :                 +- Calc(select=[a, b, c], where=[>=(b, 
UNIX_TIMESTAMP('${startTime}'))], changelogMode=[I])
    :                    +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c], changelogMode=[I])
-   +- Calc(select=[a, CAST(total_c AS BIGINT) AS total_c], where=[>(a, 50)], 
changelogMode=[I])
-      +- Calc(select=[a, 0 AS total_c], where=[>=(b, 
UNIX_TIMESTAMP('${startTime}'))], changelogMode=[I])
+   +- Calc(select=[a, 0 AS total_c], where=[>(a, 50)], changelogMode=[I])
+      +- Calc(select=[a, b, c], where=[>=(b, UNIX_TIMESTAMP('${startTime}'))], 
changelogMode=[I])
          +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c], 
changelogMode=[I])
 
 LegacySink(name=[`default_catalog`.`default_database`.`retractSink2`], 
fields=[a, total_c], changelogMode=[NONE])
 +- Union(all=[true], union=[a, total_c], changelogMode=[I,UB,UA])
-   :- Calc(select=[a, total_c], where=[<(a, 50)], changelogMode=[I,UB,UA])
-   :  +- Calc(select=[a, total_c], changelogMode=[I,UB,UA])
-   :     +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(DISTINCT c) AS 
total_c], changelogMode=[I,UB,UA])
-   :        +- Exchange(distribution=[hash[a, b]], changelogMode=[I])
-   :           +- Calc(select=[a, b, f0 AS c], changelogMode=[I])
-   :              +- Correlate(invocation=[split($cor0.c)], 
correlate=[table(split($cor0.c))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)], joinType=[INNER], changelogMode=[I])
-   :                 +- Calc(select=[a, -(b, MOD(b, 300)) AS b, c], 
where=[AND(>=(b, UNIX_TIMESTAMP('${startTime}')), <>(c, ''))], 
changelogMode=[I])
+   :- Calc(select=[a, total_c], changelogMode=[I,UB,UA])
+   :  +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(DISTINCT f0) AS 
total_c], changelogMode=[I,UB,UA])
+   :     +- Exchange(distribution=[hash[a, b]], changelogMode=[I])
+   :        +- Calc(select=[a, b, f0], where=[<(a, 50)], changelogMode=[I])
+   :           +- Correlate(invocation=[split($cor0.c)], 
correlate=[table(split($cor0.c))], select=[a,b,c,f0], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)], joinType=[INNER], changelogMode=[I])
+   :              +- Calc(select=[a, -(b, MOD(b, 300)) AS b, c], where=[<>(c, 
'')], changelogMode=[I])
+   :                 +- Calc(select=[a, b, c], where=[>=(b, 
UNIX_TIMESTAMP('${startTime}'))], changelogMode=[I])
    :                    +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, 
c], changelogMode=[I])
-   +- Calc(select=[a, CAST(total_c AS BIGINT) AS total_c], where=[<(a, 50)], 
changelogMode=[I])
-      +- Calc(select=[a, 0 AS total_c], where=[>=(b, 
UNIX_TIMESTAMP('${startTime}'))], changelogMode=[I])
+   +- Calc(select=[a, 0 AS total_c], where=[<(a, 50)], changelogMode=[I])
+      +- Calc(select=[a, b, c], where=[>=(b, UNIX_TIMESTAMP('${startTime}'))], 
changelogMode=[I])

Review Comment:
   After `AddJsonTypeOperatorEnabled` set to `false` no changes in this file 
are required, so this file was reverted



##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.xml:
##########
@@ -478,33 +483,37 @@ 
LogicalLegacySink(name=[`default_catalog`.`default_database`.`sink3`], fields=[t
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
-Union(all=[true], union=[a, c])(reuse_id=[1])
-:- Calc(select=[a, c])
-:  +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-+- Calc(select=[d, f])
-   +- LegacyTableSourceScan(table=[[default_catalog, default_database, 
MyTable1, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])(reuse_id=[1])
+
+LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable1, 
source: [TestTableSource(d, e, f)]]], fields=[d, e, f])(reuse_id=[2])
 
 LegacySink(name=[`default_catalog`.`default_database`.`sink1`], fields=[a, c])
-+- Reused(reference_id=[1])
++- Union(all=[true], union=[a, c])
+   :- Calc(select=[a, c])
+   :  +- Reused(reference_id=[1])
+   +- Calc(select=[d, f])

Review Comment:
   After `AddJsonTypeOperatorEnabled` set to `false` no changes in this file 
are required, so this file was reverted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to