This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 617fcaa60743cf48afb43c867450ec5e5aa1d846 Author: Jark Wu <[email protected]> AuthorDate: Sat Mar 11 23:22:16 2023 +0800 [hotfix][table] Fix plan tests for the new PushCalcPastChangelogNormalizeRule --- .../rules/physical/stream/ChangelogModeInferenceTest.xml | 6 +++--- .../table/planner/plan/stream/sql/TableScanTest.xml | 16 +++++++++------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml index 0d3781aff6a..2d0a54e7d86 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml @@ -468,9 +468,9 @@ Sink(table=[default_catalog.default_database.all_change_sink_table], fields=[id, : +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) : +- TableSourceScan(table=[[default_catalog, default_database, upsert_managed_table]], fields=[id, col1, col2], changelogMode=[I,UA,D]) +- Exchange(distribution=[hash[id]], changelogMode=[I,UB,UA,D]) - +- Calc(select=[id], changelogMode=[I,UB,UA,D]) - +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D]) - +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) + +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D]) + +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) + +- Calc(select=[id], changelogMode=[I,UA,D]) +- TableSourceScan(table=[[default_catalog, default_database, upsert_managed_table]], fields=[id, col1, col2], changelogMode=[I,UA,D]) ]]> </Resource> diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml index 4c24113bad7..e24c0ca5b38 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml @@ -98,10 +98,11 @@ LogicalProject(a=[$1], b=[$2], c=[$3]) Calc(select=[a, b, c], where=[>(a, 1)], changelogMode=[I,UB,UA,D]) +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D]) +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D]) - +- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)], changelogMode=[I,UA,D]) - +- Calc(select=[id, a, +(a, 1) AS b, c, TO_TIMESTAMP(c) AS ts], changelogMode=[I,UA,D]) - +- DropUpdateBefore(changelogMode=[I,UA,D]) - +- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id, a, c], changelogMode=[I,UB,UA,D]) + +- Calc(select=[id, a, b, c], changelogMode=[I,UA,D]) + +- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)], changelogMode=[I,UA,D]) + +- Calc(select=[id, a, +(a, 1) AS b, c, TO_TIMESTAMP(c) AS ts], changelogMode=[I,UA,D]) + +- DropUpdateBefore(changelogMode=[I,UA,D]) + +- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id, a, c], changelogMode=[I,UB,UA,D]) ]]> </Resource> </TestCase> @@ -630,9 +631,10 @@ LogicalProject(a=[$1], b=[$2], c=[$3]) Calc(select=[a, b, c], where=[>(a, 1)], changelogMode=[I,UB,UA,D]) +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D]) +- Exchange(distribution=[hash[id]], changelogMode=[UA,D]) - +- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)], changelogMode=[UA,D]) - +- Calc(select=[id, a, +(a, 1) AS b, c, TO_TIMESTAMP(c) AS ts], changelogMode=[UA,D]) - +- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id, a, c], changelogMode=[UA,D]) + +- Calc(select=[id, a, b, c], changelogMode=[UA,D]) + +- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL SECOND)], changelogMode=[UA,D]) + +- Calc(select=[id, a, +(a, 1) AS b, c, TO_TIMESTAMP(c) AS ts], changelogMode=[UA,D]) + +- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[id, a, c], changelogMode=[UA,D]) ]]> </Resource> </TestCase>
