This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new aa97d86c1ed [FLINK-38317][table] Make PushCalcPastChangelogNormalizeRule tolerant to trimmed fields aa97d86c1ed is described below commit aa97d86c1ed252d92c3cc4e31718d42b50408f64 Author: Sergey Nuyanzin <snuyan...@gmail.com> AuthorDate: Thu Sep 4 09:31:07 2025 +0200 [FLINK-38317][table] Make PushCalcPastChangelogNormalizeRule tolerant to trimmed fields --- .../stream/PushCalcPastChangelogNormalizeRule.java | 7 +- .../PushCalcPastChangelogNormalizeRuleTest.java | 20 ++++++ .../PushCalcPastChangelogNormalizeRuleTest.xml | 82 ++++++++++++++++++++++ 3 files changed, 107 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRule.java index bbe64d2e5ac..da152485828 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRule.java @@ -326,8 +326,11 @@ public class PushCalcPastChangelogNormalizeRule } if (!conditions.isEmpty()) { final RexNode condition = relBuilder.and(conditions); - programBuilder.addCondition( - FlinkRexUtil.simplify(relBuilder.getRexBuilder(), condition, rexExecutor)); + final RexNode simplifiedCondition = + FlinkRexUtil.simplify(relBuilder.getRexBuilder(), condition, rexExecutor); + if (!condition.isAlwaysTrue()) { + programBuilder.addCondition(adjustInputRef(simplifiedCondition, inputRefMapping)); + } } final RexProgram newProgram = programBuilder.getProgram(); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRuleTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRuleTest.java index 3e466f40fc0..a31f9c574c0 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRuleTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRuleTest.java @@ -217,4 +217,24 @@ class PushCalcPastChangelogNormalizeRuleTest extends TableTestBase { util.tableEnv().createTable("T", sourceDescriptorWithTwoPrimaryKeys); util.verifyRelPlan("SELECT f1, f5 FROM T WHERE f1 < 1 AND f3 IS NOT NULL"); } + + @Test + void testPartialPushDownWithTrimmedFieldsAndDifferentProjection() { + util.tableEnv().createTable("T", sourceDescriptorWithTwoPrimaryKeys); + // verifyExecPlan is intended here as it will show whether the node is reused or not + util.verifyExecPlan( + "SELECT f3 FROM T WHERE f2 < 1 AND f2 > 0\n" + + " UNION SELECT f3 FROM T WHERE f2 < 3 AND f2 > 0\n" + + " INTERSECT SELECT f3 FROM T WHERE f2 > 0 AND f2 < 10"); + } + + @Test + void testPartialPushDownWithTrimmedFields() { + util.tableEnv().createTable("T", sourceDescriptorWithTwoPrimaryKeys); + // verifyExecPlan is intended here as it will show whether the node is reused or not + util.verifyExecPlan( + "SELECT f2 FROM T WHERE f2 < 1 AND f2 > 0\n" + + " UNION SELECT f2 FROM T WHERE f2 < 3 AND f2 > 0\n" + + " INTERSECT SELECT f2 FROM T WHERE f2 > 0 AND f2 < 10"); + } } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRuleTest.xml index 76fb44c599a..506a22b2afd 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRuleTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRuleTest.xml @@ -292,6 +292,88 @@ Calc(select=[f1, f5]) +- Exchange(distribution=[hash[f1, f2]]) +- Calc(select=[f1, f2, f3, f5], where=[<(f1, 1)]) +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[f0, f1, f2, f3, f4, f5, f6, f7]) +]]> + </Resource> + </TestCase> + <TestCase name="testPartialPushDownWithTrimmedFields"> + <Resource name="sql"> + <![CDATA[SELECT f2 FROM T WHERE f2 < 1 AND f2 > 0 + UNION SELECT f2 FROM T WHERE f2 < 3 AND f2 > 0 + INTERSECT SELECT f2 FROM T WHERE f2 > 0 AND f2 < 10]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalUnion(all=[false]) +:- LogicalProject(f2=[$2]) +: +- LogicalFilter(condition=[AND(<($2, 1), >($2, 0))]) +: +- LogicalTableScan(table=[[default_catalog, default_database, T]]) ++- LogicalIntersect(all=[false]) + :- LogicalProject(f2=[$2]) + : +- LogicalFilter(condition=[AND(<($2, 3), >($2, 0))]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T]]) + +- LogicalProject(f2=[$2]) + +- LogicalFilter(condition=[AND(>($2, 0), <($2, 10))]) + +- LogicalTableScan(table=[[default_catalog, default_database, T]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +GroupAggregate(groupBy=[f2], select=[f2]) ++- Exchange(distribution=[hash[f2]]) + +- Union(all=[true], union=[f2]) + :- Calc(select=[f2], where=[(f2 < 1)]) + : +- ChangelogNormalize(key=[f1, f2])(reuse_id=[1]) + : +- Exchange(distribution=[hash[f1, f2]]) + : +- Calc(select=[f1, f2], where=[(f2 > 0)]) + : +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[f0, f1, f2, f3, f4, f5, f6, f7]) + +- Join(joinType=[LeftSemiJoin], where=[(f2 = f20)], select=[f2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + :- Exchange(distribution=[hash[f2]]) + : +- Calc(select=[f2], where=[(f2 < 3)]) + : +- Reused(reference_id=[1]) + +- Exchange(distribution=[hash[f2]]) + +- Calc(select=[f2], where=[(f2 < 10)]) + +- Reused(reference_id=[1]) +]]> + </Resource> + </TestCase> + <TestCase name="testPartialPushDownWithTrimmedFieldsAndDifferentProjection"> + <Resource name="sql"> + <![CDATA[SELECT f3 FROM T WHERE f2 < 1 AND f2 > 0 + UNION SELECT f3 FROM T WHERE f2 < 3 AND f2 > 0 + INTERSECT SELECT f3 FROM T WHERE f2 > 0 AND f2 < 10]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalUnion(all=[false]) +:- LogicalProject(f3=[$3]) +: +- LogicalFilter(condition=[AND(<($2, 1), >($2, 0))]) +: +- LogicalTableScan(table=[[default_catalog, default_database, T]]) ++- LogicalIntersect(all=[false]) + :- LogicalProject(f3=[$3]) + : +- LogicalFilter(condition=[AND(<($2, 3), >($2, 0))]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T]]) + +- LogicalProject(f3=[$3]) + +- LogicalFilter(condition=[AND(>($2, 0), <($2, 10))]) + +- LogicalTableScan(table=[[default_catalog, default_database, T]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +GroupAggregate(groupBy=[f3], select=[f3]) ++- Exchange(distribution=[hash[f3]]) + +- Union(all=[true], union=[f3]) + :- Calc(select=[f3], where=[(f2 < 1)]) + : +- ChangelogNormalize(key=[f1, f2])(reuse_id=[1]) + : +- Exchange(distribution=[hash[f1, f2]]) + : +- Calc(select=[f1, f2, f3], where=[(f2 > 0)]) + : +- TableSourceScan(table=[[default_catalog, default_database, T]], fields=[f0, f1, f2, f3, f4, f5, f6, f7]) + +- Join(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT FROM(f3, f30)], select=[f3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + :- Exchange(distribution=[hash[f3]]) + : +- Calc(select=[f3], where=[(f2 < 3)]) + : +- Reused(reference_id=[1]) + +- Exchange(distribution=[hash[f3]]) + +- Calc(select=[f3], where=[(f2 < 10)]) + +- Reused(reference_id=[1]) ]]> </Resource> </TestCase>