snuyanzin commented on code in PR #22558:
URL: https://github.com/apache/flink/pull/22558#discussion_r1239034473
##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DagOptimizationTest.xml:
##########
@@ -379,30 +382,30 @@
LogicalLegacySink(name=[`default_catalog`.`default_database`.`retractSink2`], fi
<![CDATA[
LegacySink(name=[`default_catalog`.`default_database`.`retractSink1`],
fields=[a, total_c], changelogMode=[NONE])
+- Union(all=[true], union=[a, total_c], changelogMode=[I,UB,UA])
- :- Calc(select=[a, total_c], where=[>(a, 50)], changelogMode=[I,UB,UA])
- : +- Calc(select=[a, total_c], changelogMode=[I,UB,UA])
- : +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(DISTINCT c) AS
total_c], changelogMode=[I,UB,UA])
- : +- Exchange(distribution=[hash[a, b]], changelogMode=[I])
- : +- Calc(select=[a, b, f0 AS c], changelogMode=[I])
- : +- Correlate(invocation=[split($cor0.c)],
correlate=[table(split($cor0.c))], select=[a,b,c,f0],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c,
VARCHAR(2147483647) f0)], joinType=[INNER], changelogMode=[I])
- : +- Calc(select=[a, -(b, MOD(b, 300)) AS b, c],
where=[AND(>=(b, UNIX_TIMESTAMP('${startTime}')), <>(c, ''))],
changelogMode=[I])
+ :- Calc(select=[a, total_c], changelogMode=[I,UB,UA])
+ : +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(DISTINCT f0) AS
total_c], changelogMode=[I,UB,UA])
+ : +- Exchange(distribution=[hash[a, b]], changelogMode=[I])
+ : +- Calc(select=[a, b, f0], where=[>(a, 50)], changelogMode=[I])
+ : +- Correlate(invocation=[split($cor0.c)],
correlate=[table(split($cor0.c))], select=[a,b,c,f0],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c,
VARCHAR(2147483647) f0)], joinType=[INNER], changelogMode=[I])
+ : +- Calc(select=[a, -(b, MOD(b, 300)) AS b, c], where=[<>(c,
'')], changelogMode=[I])
+ : +- Calc(select=[a, b, c], where=[>=(b,
UNIX_TIMESTAMP('${startTime}'))], changelogMode=[I])
: +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b,
c], changelogMode=[I])
- +- Calc(select=[a, CAST(total_c AS BIGINT) AS total_c], where=[>(a, 50)],
changelogMode=[I])
- +- Calc(select=[a, 0 AS total_c], where=[>=(b,
UNIX_TIMESTAMP('${startTime}'))], changelogMode=[I])
+ +- Calc(select=[a, 0 AS total_c], where=[>(a, 50)], changelogMode=[I])
+ +- Calc(select=[a, b, c], where=[>=(b, UNIX_TIMESTAMP('${startTime}'))],
changelogMode=[I])
+- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c],
changelogMode=[I])
LegacySink(name=[`default_catalog`.`default_database`.`retractSink2`],
fields=[a, total_c], changelogMode=[NONE])
+- Union(all=[true], union=[a, total_c], changelogMode=[I,UB,UA])
- :- Calc(select=[a, total_c], where=[<(a, 50)], changelogMode=[I,UB,UA])
- : +- Calc(select=[a, total_c], changelogMode=[I,UB,UA])
- : +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(DISTINCT c) AS
total_c], changelogMode=[I,UB,UA])
- : +- Exchange(distribution=[hash[a, b]], changelogMode=[I])
- : +- Calc(select=[a, b, f0 AS c], changelogMode=[I])
- : +- Correlate(invocation=[split($cor0.c)],
correlate=[table(split($cor0.c))], select=[a,b,c,f0],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c,
VARCHAR(2147483647) f0)], joinType=[INNER], changelogMode=[I])
- : +- Calc(select=[a, -(b, MOD(b, 300)) AS b, c],
where=[AND(>=(b, UNIX_TIMESTAMP('${startTime}')), <>(c, ''))],
changelogMode=[I])
+ :- Calc(select=[a, total_c], changelogMode=[I,UB,UA])
+ : +- GroupAggregate(groupBy=[a, b], select=[a, b, COUNT(DISTINCT f0) AS
total_c], changelogMode=[I,UB,UA])
+ : +- Exchange(distribution=[hash[a, b]], changelogMode=[I])
+ : +- Calc(select=[a, b, f0], where=[<(a, 50)], changelogMode=[I])
+ : +- Correlate(invocation=[split($cor0.c)],
correlate=[table(split($cor0.c))], select=[a,b,c,f0],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c,
VARCHAR(2147483647) f0)], joinType=[INNER], changelogMode=[I])
+ : +- Calc(select=[a, -(b, MOD(b, 300)) AS b, c], where=[<>(c,
'')], changelogMode=[I])
+ : +- Calc(select=[a, b, c], where=[>=(b,
UNIX_TIMESTAMP('${startTime}'))], changelogMode=[I])
: +- LegacyTableSourceScan(table=[[default_catalog,
default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b,
c], changelogMode=[I])
- +- Calc(select=[a, CAST(total_c AS BIGINT) AS total_c], where=[<(a, 50)],
changelogMode=[I])
- +- Calc(select=[a, 0 AS total_c], where=[>=(b,
UNIX_TIMESTAMP('${startTime}'))], changelogMode=[I])
+ +- Calc(select=[a, 0 AS total_c], where=[<(a, 50)], changelogMode=[I])
+ +- Calc(select=[a, b, c], where=[>=(b, UNIX_TIMESTAMP('${startTime}'))],
changelogMode=[I])
Review Comment:
After `AddJsonTypeOperatorEnabled` set to `false` no changes in this file
are required, so this file was reverted
##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.xml:
##########
@@ -478,33 +483,37 @@
LogicalLegacySink(name=[`default_catalog`.`default_database`.`sink3`], fields=[t
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Union(all=[true], union=[a, c])(reuse_id=[1])
-:- Calc(select=[a, c])
-: +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
-+- Calc(select=[d, f])
- +- LegacyTableSourceScan(table=[[default_catalog, default_database,
MyTable1, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
+LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])(reuse_id=[1])
+
+LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable1,
source: [TestTableSource(d, e, f)]]], fields=[d, e, f])(reuse_id=[2])
LegacySink(name=[`default_catalog`.`default_database`.`sink1`], fields=[a, c])
-+- Reused(reference_id=[1])
++- Union(all=[true], union=[a, c])
+ :- Calc(select=[a, c])
+ : +- Reused(reference_id=[1])
+ +- Calc(select=[d, f])
Review Comment:
After `AddJsonTypeOperatorEnabled` set to `false` no changes in this file
are required, so this file was reverted
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]