This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 617fcaa60743cf48afb43c867450ec5e5aa1d846
Author: Jark Wu <[email protected]>
AuthorDate: Sat Mar 11 23:22:16 2023 +0800

    [hotfix][table] Fix plan tests for the new 
PushCalcPastChangelogNormalizeRule
---
 .../rules/physical/stream/ChangelogModeInferenceTest.xml |  6 +++---
 .../table/planner/plan/stream/sql/TableScanTest.xml      | 16 +++++++++-------
 2 files changed, 12 insertions(+), 10 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
index 0d3781aff6a..2d0a54e7d86 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/ChangelogModeInferenceTest.xml
@@ -468,9 +468,9 @@ 
Sink(table=[default_catalog.default_database.all_change_sink_table], fields=[id,
       :     +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
       :        +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_managed_table]], fields=[id, col1, col2], changelogMode=[I,UA,D])
       +- Exchange(distribution=[hash[id]], changelogMode=[I,UB,UA,D])
-         +- Calc(select=[id], changelogMode=[I,UB,UA,D])
-            +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
-               +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
+         +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
+            +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
+               +- Calc(select=[id], changelogMode=[I,UA,D])
                   +- TableSourceScan(table=[[default_catalog, 
default_database, upsert_managed_table]], fields=[id, col1, col2], 
changelogMode=[I,UA,D])
 ]]>
                </Resource>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
index 4c24113bad7..e24c0ca5b38 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
@@ -98,10 +98,11 @@ LogicalProject(a=[$1], b=[$2], c=[$3])
 Calc(select=[a, b, c], where=[>(a, 1)], changelogMode=[I,UB,UA,D])
 +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
    +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
-      +- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL 
SECOND)], changelogMode=[I,UA,D])
-         +- Calc(select=[id, a, +(a, 1) AS b, c, TO_TIMESTAMP(c) AS ts], 
changelogMode=[I,UA,D])
-            +- DropUpdateBefore(changelogMode=[I,UA,D])
-               +- TableSourceScan(table=[[default_catalog, default_database, 
src]], fields=[id, a, c], changelogMode=[I,UB,UA,D])
+      +- Calc(select=[id, a, b, c], changelogMode=[I,UA,D])
+         +- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL 
SECOND)], changelogMode=[I,UA,D])
+            +- Calc(select=[id, a, +(a, 1) AS b, c, TO_TIMESTAMP(c) AS ts], 
changelogMode=[I,UA,D])
+               +- DropUpdateBefore(changelogMode=[I,UA,D])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, src]], fields=[id, a, c], changelogMode=[I,UB,UA,D])
 ]]>
     </Resource>
   </TestCase>
@@ -630,9 +631,10 @@ LogicalProject(a=[$1], b=[$2], c=[$3])
 Calc(select=[a, b, c], where=[>(a, 1)], changelogMode=[I,UB,UA,D])
 +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
    +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
-      +- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL 
SECOND)], changelogMode=[UA,D])
-         +- Calc(select=[id, a, +(a, 1) AS b, c, TO_TIMESTAMP(c) AS ts], 
changelogMode=[UA,D])
-            +- TableSourceScan(table=[[default_catalog, default_database, 
src]], fields=[id, a, c], changelogMode=[UA,D])
+      +- Calc(select=[id, a, b, c], changelogMode=[UA,D])
+         +- WatermarkAssigner(rowtime=[ts], watermark=[-(ts, 1000:INTERVAL 
SECOND)], changelogMode=[UA,D])
+            +- Calc(select=[id, a, +(a, 1) AS b, c, TO_TIMESTAMP(c) AS ts], 
changelogMode=[UA,D])
+               +- TableSourceScan(table=[[default_catalog, default_database, 
src]], fields=[id, a, c], changelogMode=[UA,D])
 ]]>
     </Resource>
   </TestCase>

Reply via email to