This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e5f53f272dc [FLINK-38693][table] Changelog normalize shouldn't call 
CNF for marking program
e5f53f272dc is described below

commit e5f53f272dcdac4249e2433091fbacf0e1df2daf
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Thu Nov 20 18:46:02 2025 +0100

    [FLINK-38693][table] Changelog normalize shouldn't call CNF for marking 
program
---
 .../stream/FlinkMarkChangelogNormalizeProgram.java | 85 ++++++++++++++++------
 1 file changed, 61 insertions(+), 24 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/FlinkMarkChangelogNormalizeProgram.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/FlinkMarkChangelogNormalizeProgram.java
index 4c0d09bf7cd..24d3ee495c7 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/FlinkMarkChangelogNormalizeProgram.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/FlinkMarkChangelogNormalizeProgram.java
@@ -23,19 +23,24 @@ import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalC
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalChangelogNormalize;
 import 
org.apache.flink.table.planner.plan.optimize.program.FlinkOptimizeProgram;
 import 
org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext;
-import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
 
+import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLocalRef;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 
 import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
 /**
  * A {@link FlinkOptimizeProgram} that marks ChangelogNormalize nodes using 
the same source and
@@ -149,27 +154,47 @@ public class FlinkMarkChangelogNormalizeProgram
                 continue;
             }
 
-            final Set<RexNode> common = 
calculateCommonCondition(changelogNormalizeContexts);
+            final List<RexNode> commons =
+                    calculateCommonCondition(rexBuilder, 
changelogNormalizeContexts);
             for (ChangelogNormalizeContext ctx : changelogNormalizeContexts) {
                 ctx.getChangelogNormalize().markSourceReuse();
-                if (!common.isEmpty()) {
-                    
ctx.getChangelogNormalize().setCommonFilter(common.toArray(new RexNode[0]));
+                if (!commons.isEmpty()) {
+                    
ctx.getChangelogNormalize().setCommonFilter(commons.toArray(new RexNode[0]));
                 }
             }
         }
         return root;
     }
 
-    private Set<RexNode> calculateCommonCondition(
-            List<ChangelogNormalizeContext> changelogNormalizeContexts) {
-        changelogNormalizeContexts.sort(Comparator.comparingInt(o -> 
o.getConditions().size()));
-        final Set<RexNode> common =
-                new 
HashSet<>(changelogNormalizeContexts.get(0).getConditions());
+    private List<RexNode> calculateCommonCondition(
+            RexBuilder rexBuilder, List<ChangelogNormalizeContext> 
changelogNormalizeContexts) {
+        if (changelogNormalizeContexts.stream()
+                .map(ChangelogNormalizeContext::getConditions)
+                .anyMatch(Objects::isNull)) {
+            return List.of();
+        }
+
+        final RexNode or =
+                rexBuilder.makeCall(
+                        SqlStdOperatorTable.OR,
+                        changelogNormalizeContexts.stream()
+                                .map(ChangelogNormalizeContext::getConditions)
+                                .collect(Collectors.toList()));
+        final RexCall factors = (RexCall) RexUtil.pullFactors(rexBuilder, or);
 
-        for (int i = 1; i < changelogNormalizeContexts.size() && 
!common.isEmpty(); i++) {
-            
common.retainAll(changelogNormalizeContexts.get(i).getConditions());
+        final List<RexNode> commonCondition = new ArrayList<>();
+        // Since we are interested in factors only then look for AND
+        if (factors.getKind() == SqlKind.AND) {
+            for (RexNode node : factors.getOperands()) {
+                // If there is OR on top level then it is not a common factor 
anymore
+                if (node.getKind() == SqlKind.OR) {
+                    break;
+                }
+                
commonCondition.addAll(RelOptUtil.conjunctions(RexUtil.toCnf(rexBuilder, 
node)));
+            }
+            return commonCondition;
         }
-        return common;
+        return List.of();
     }
 
     private void gatherTableScanToChangelogNormalizeMap(
@@ -182,12 +207,24 @@ public class FlinkMarkChangelogNormalizeProgram
                         (StreamPhysicalChangelogNormalize) input;
                 if (curRelNode instanceof StreamPhysicalCalc) {
                     StreamPhysicalCalc calc = (StreamPhysicalCalc) curRelNode;
-                    final List<RexNode> conditions =
-                            FlinkRexUtil.extractConjunctiveConditions(
-                                    rexBuilder, calc.getProgram());
+                    RexLocalRef localRef = calc.getProgram().getCondition();
+                    final RexNode condition;
+                    if (localRef == null) {
+                        condition = null;
+                    } else {
+                        // Expanded Sarg allows to extract partial common 
filter out of it
+                        RexNode rexNodeWithExpandedSearch =
+                                RexUtil.expandSearch(
+                                        rexBuilder,
+                                        calc.getProgram(),
+                                        
calc.getProgram().expandLocalRef(localRef));
+                        // First pull factors from conditions per Changelog 
Normalize node
+                        // then find the common for all of them
+                        condition = RexUtil.pullFactors(rexBuilder, 
rexNodeWithExpandedSearch);
+                    }
                     gatherTableScanToChangelogNormalizeMap(
                             input,
-                            ChangelogNormalizeContext.of(changelogNormalize, 
conditions),
+                            ChangelogNormalizeContext.of(changelogNormalize, 
condition),
                             map);
                 }
             } else {
@@ -211,16 +248,16 @@ public class FlinkMarkChangelogNormalizeProgram
 
     private static class ChangelogNormalizeContext {
         private final StreamPhysicalChangelogNormalize changelogNormalize;
-        private final List<RexNode> conditions;
+        private final RexNode conditions;
 
-        public ChangelogNormalizeContext(
-                StreamPhysicalChangelogNormalize changelogNormalize, 
List<RexNode> conditions) {
+        private ChangelogNormalizeContext(
+                StreamPhysicalChangelogNormalize changelogNormalize, RexNode 
conditions) {
             this.changelogNormalize = changelogNormalize;
             this.conditions = conditions;
         }
 
         public static ChangelogNormalizeContext of(
-                StreamPhysicalChangelogNormalize changelogNormalize, 
List<RexNode> conditions) {
+                StreamPhysicalChangelogNormalize changelogNormalize, RexNode 
conditions) {
             return new ChangelogNormalizeContext(changelogNormalize, 
conditions);
         }
 
@@ -228,7 +265,7 @@ public class FlinkMarkChangelogNormalizeProgram
             return changelogNormalize;
         }
 
-        public List<RexNode> getConditions() {
+        public RexNode getConditions() {
             return conditions;
         }
     }

Reply via email to