This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new aa97d86c1ed [FLINK-38317][table] Make 
PushCalcPastChangelogNormalizeRule tolerant to trimmed fields
aa97d86c1ed is described below

commit aa97d86c1ed252d92c3cc4e31718d42b50408f64
Author: Sergey Nuyanzin <snuyan...@gmail.com>
AuthorDate: Thu Sep 4 09:31:07 2025 +0200

    [FLINK-38317][table] Make PushCalcPastChangelogNormalizeRule tolerant to 
trimmed fields
---
 .../stream/PushCalcPastChangelogNormalizeRule.java |  7 +-
 .../PushCalcPastChangelogNormalizeRuleTest.java    | 20 ++++++
 .../PushCalcPastChangelogNormalizeRuleTest.xml     | 82 ++++++++++++++++++++++
 3 files changed, 107 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRule.java
index bbe64d2e5ac..da152485828 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRule.java
@@ -326,8 +326,11 @@ public class PushCalcPastChangelogNormalizeRule
         }
         if (!conditions.isEmpty()) {
             final RexNode condition = relBuilder.and(conditions);
-            programBuilder.addCondition(
-                    FlinkRexUtil.simplify(relBuilder.getRexBuilder(), 
condition, rexExecutor));
+            final RexNode simplifiedCondition =
+                    FlinkRexUtil.simplify(relBuilder.getRexBuilder(), 
condition, rexExecutor);
+            if (!condition.isAlwaysTrue()) {
+                
programBuilder.addCondition(adjustInputRef(simplifiedCondition, 
inputRefMapping));
+            }
         }
 
         final RexProgram newProgram = programBuilder.getProgram();
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRuleTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRuleTest.java
index 3e466f40fc0..a31f9c574c0 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRuleTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRuleTest.java
@@ -217,4 +217,24 @@ class PushCalcPastChangelogNormalizeRuleTest extends 
TableTestBase {
         util.tableEnv().createTable("T", sourceDescriptorWithTwoPrimaryKeys);
         util.verifyRelPlan("SELECT f1, f5 FROM T WHERE f1 < 1 AND f3 IS NOT 
NULL");
     }
+
+    @Test
+    void testPartialPushDownWithTrimmedFieldsAndDifferentProjection() {
+        util.tableEnv().createTable("T", sourceDescriptorWithTwoPrimaryKeys);
+        // verifyExecPlan is intended here as it will show whether the node is 
reused or not
+        util.verifyExecPlan(
+                "SELECT f3 FROM T WHERE f2 < 1 AND f2 > 0\n"
+                        + " UNION SELECT f3 FROM T WHERE f2 < 3 AND f2 > 0\n"
+                        + " INTERSECT SELECT f3 FROM T WHERE f2 > 0 AND f2 < 
10");
+    }
+
+    @Test
+    void testPartialPushDownWithTrimmedFields() {
+        util.tableEnv().createTable("T", sourceDescriptorWithTwoPrimaryKeys);
+        // verifyExecPlan is intended here as it will show whether the node is 
reused or not
+        util.verifyExecPlan(
+                "SELECT f2 FROM T WHERE f2 < 1 AND f2 > 0\n"
+                        + " UNION SELECT f2 FROM T WHERE f2 < 3 AND f2 > 0\n"
+                        + " INTERSECT SELECT f2 FROM T WHERE f2 > 0 AND f2 < 
10");
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRuleTest.xml
index 76fb44c599a..506a22b2afd 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/PushCalcPastChangelogNormalizeRuleTest.xml
@@ -292,6 +292,88 @@ Calc(select=[f1, f5])
    +- Exchange(distribution=[hash[f1, f2]])
       +- Calc(select=[f1, f2, f3, f5], where=[<(f1, 1)])
          +- TableSourceScan(table=[[default_catalog, default_database, T]], 
fields=[f0, f1, f2, f3, f4, f5, f6, f7])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testPartialPushDownWithTrimmedFields">
+    <Resource name="sql">
+      <![CDATA[SELECT f2 FROM T WHERE f2 < 1 AND f2 > 0
+ UNION SELECT f2 FROM T WHERE f2 < 3 AND f2 > 0
+ INTERSECT SELECT f2 FROM T WHERE f2 > 0 AND f2 < 10]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalUnion(all=[false])
+:- LogicalProject(f2=[$2])
+:  +- LogicalFilter(condition=[AND(<($2, 1), >($2, 0))])
+:     +- LogicalTableScan(table=[[default_catalog, default_database, T]])
++- LogicalIntersect(all=[false])
+   :- LogicalProject(f2=[$2])
+   :  +- LogicalFilter(condition=[AND(<($2, 3), >($2, 0))])
+   :     +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+   +- LogicalProject(f2=[$2])
+      +- LogicalFilter(condition=[AND(>($2, 0), <($2, 10))])
+         +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+GroupAggregate(groupBy=[f2], select=[f2])
++- Exchange(distribution=[hash[f2]])
+   +- Union(all=[true], union=[f2])
+      :- Calc(select=[f2], where=[(f2 < 1)])
+      :  +- ChangelogNormalize(key=[f1, f2])(reuse_id=[1])
+      :     +- Exchange(distribution=[hash[f1, f2]])
+      :        +- Calc(select=[f1, f2], where=[(f2 > 0)])
+      :           +- TableSourceScan(table=[[default_catalog, 
default_database, T]], fields=[f0, f1, f2, f3, f4, f5, f6, f7])
+      +- Join(joinType=[LeftSemiJoin], where=[(f2 = f20)], select=[f2], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+         :- Exchange(distribution=[hash[f2]])
+         :  +- Calc(select=[f2], where=[(f2 < 3)])
+         :     +- Reused(reference_id=[1])
+         +- Exchange(distribution=[hash[f2]])
+            +- Calc(select=[f2], where=[(f2 < 10)])
+               +- Reused(reference_id=[1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testPartialPushDownWithTrimmedFieldsAndDifferentProjection">
+    <Resource name="sql">
+      <![CDATA[SELECT f3 FROM T WHERE f2 < 1 AND f2 > 0
+ UNION SELECT f3 FROM T WHERE f2 < 3 AND f2 > 0
+ INTERSECT SELECT f3 FROM T WHERE f2 > 0 AND f2 < 10]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalUnion(all=[false])
+:- LogicalProject(f3=[$3])
+:  +- LogicalFilter(condition=[AND(<($2, 1), >($2, 0))])
+:     +- LogicalTableScan(table=[[default_catalog, default_database, T]])
++- LogicalIntersect(all=[false])
+   :- LogicalProject(f3=[$3])
+   :  +- LogicalFilter(condition=[AND(<($2, 3), >($2, 0))])
+   :     +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+   +- LogicalProject(f3=[$3])
+      +- LogicalFilter(condition=[AND(>($2, 0), <($2, 10))])
+         +- LogicalTableScan(table=[[default_catalog, default_database, T]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+GroupAggregate(groupBy=[f3], select=[f3])
++- Exchange(distribution=[hash[f3]])
+   +- Union(all=[true], union=[f3])
+      :- Calc(select=[f3], where=[(f2 < 1)])
+      :  +- ChangelogNormalize(key=[f1, f2])(reuse_id=[1])
+      :     +- Exchange(distribution=[hash[f1, f2]])
+      :        +- Calc(select=[f1, f2, f3], where=[(f2 > 0)])
+      :           +- TableSourceScan(table=[[default_catalog, 
default_database, T]], fields=[f0, f1, f2, f3, f4, f5, f6, f7])
+      +- Join(joinType=[LeftSemiJoin], where=[IS NOT DISTINCT FROM(f3, f30)], 
select=[f3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
+         :- Exchange(distribution=[hash[f3]])
+         :  +- Calc(select=[f3], where=[(f2 < 3)])
+         :     +- Reused(reference_id=[1])
+         +- Exchange(distribution=[hash[f3]])
+            +- Calc(select=[f3], where=[(f2 < 10)])
+               +- Reused(reference_id=[1])
 ]]>
     </Resource>
   </TestCase>

Reply via email to