This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e5f53f272dc [FLINK-38693][table] Changelog normalize shouldn't call
CNF for marking program
e5f53f272dc is described below
commit e5f53f272dcdac4249e2433091fbacf0e1df2daf
Author: Sergey Nuyanzin <[email protected]>
AuthorDate: Thu Nov 20 18:46:02 2025 +0100
[FLINK-38693][table] Changelog normalize shouldn't call CNF for marking
program
---
.../stream/FlinkMarkChangelogNormalizeProgram.java | 85 ++++++++++++++++------
1 file changed, 61 insertions(+), 24 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/FlinkMarkChangelogNormalizeProgram.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/FlinkMarkChangelogNormalizeProgram.java
index 4c0d09bf7cd..24d3ee495c7 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/FlinkMarkChangelogNormalizeProgram.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/FlinkMarkChangelogNormalizeProgram.java
@@ -23,19 +23,24 @@ import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalC
import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalChangelogNormalize;
import
org.apache.flink.table.planner.plan.optimize.program.FlinkOptimizeProgram;
import
org.apache.flink.table.planner.plan.optimize.program.StreamOptimizeContext;
-import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
+import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLocalRef;
import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import java.util.ArrayList;
-import java.util.Comparator;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
+import java.util.Objects;
+import java.util.stream.Collectors;
/**
* A {@link FlinkOptimizeProgram} that marks ChangelogNormalize nodes using
the same source and
@@ -149,27 +154,47 @@ public class FlinkMarkChangelogNormalizeProgram
continue;
}
- final Set<RexNode> common =
calculateCommonCondition(changelogNormalizeContexts);
+ final List<RexNode> commons =
+ calculateCommonCondition(rexBuilder,
changelogNormalizeContexts);
for (ChangelogNormalizeContext ctx : changelogNormalizeContexts) {
ctx.getChangelogNormalize().markSourceReuse();
- if (!common.isEmpty()) {
-
ctx.getChangelogNormalize().setCommonFilter(common.toArray(new RexNode[0]));
+ if (!commons.isEmpty()) {
+
ctx.getChangelogNormalize().setCommonFilter(commons.toArray(new RexNode[0]));
}
}
}
return root;
}
- private Set<RexNode> calculateCommonCondition(
- List<ChangelogNormalizeContext> changelogNormalizeContexts) {
- changelogNormalizeContexts.sort(Comparator.comparingInt(o ->
o.getConditions().size()));
- final Set<RexNode> common =
- new
HashSet<>(changelogNormalizeContexts.get(0).getConditions());
+ private List<RexNode> calculateCommonCondition(
+ RexBuilder rexBuilder, List<ChangelogNormalizeContext>
changelogNormalizeContexts) {
+ if (changelogNormalizeContexts.stream()
+ .map(ChangelogNormalizeContext::getConditions)
+ .anyMatch(Objects::isNull)) {
+ return List.of();
+ }
+
+ final RexNode or =
+ rexBuilder.makeCall(
+ SqlStdOperatorTable.OR,
+ changelogNormalizeContexts.stream()
+ .map(ChangelogNormalizeContext::getConditions)
+ .collect(Collectors.toList()));
+ final RexCall factors = (RexCall) RexUtil.pullFactors(rexBuilder, or);
- for (int i = 1; i < changelogNormalizeContexts.size() &&
!common.isEmpty(); i++) {
-
common.retainAll(changelogNormalizeContexts.get(i).getConditions());
+ final List<RexNode> commonCondition = new ArrayList<>();
+ // Since we are interested in factors only then look for AND
+ if (factors.getKind() == SqlKind.AND) {
+ for (RexNode node : factors.getOperands()) {
+ // If there is OR on top level then it is not a common factor
anymore
+ if (node.getKind() == SqlKind.OR) {
+ break;
+ }
+
commonCondition.addAll(RelOptUtil.conjunctions(RexUtil.toCnf(rexBuilder,
node)));
+ }
+ return commonCondition;
}
- return common;
+ return List.of();
}
private void gatherTableScanToChangelogNormalizeMap(
@@ -182,12 +207,24 @@ public class FlinkMarkChangelogNormalizeProgram
(StreamPhysicalChangelogNormalize) input;
if (curRelNode instanceof StreamPhysicalCalc) {
StreamPhysicalCalc calc = (StreamPhysicalCalc) curRelNode;
- final List<RexNode> conditions =
- FlinkRexUtil.extractConjunctiveConditions(
- rexBuilder, calc.getProgram());
+ RexLocalRef localRef = calc.getProgram().getCondition();
+ final RexNode condition;
+ if (localRef == null) {
+ condition = null;
+ } else {
+ // Expanded Sarg allows to extract partial common
filter out of it
+ RexNode rexNodeWithExpandedSearch =
+ RexUtil.expandSearch(
+ rexBuilder,
+ calc.getProgram(),
+
calc.getProgram().expandLocalRef(localRef));
+ // First pull factors from conditions per Changelog
Normalize node
+ // then find the common for all of them
+ condition = RexUtil.pullFactors(rexBuilder,
rexNodeWithExpandedSearch);
+ }
gatherTableScanToChangelogNormalizeMap(
input,
- ChangelogNormalizeContext.of(changelogNormalize,
conditions),
+ ChangelogNormalizeContext.of(changelogNormalize,
condition),
map);
}
} else {
@@ -211,16 +248,16 @@ public class FlinkMarkChangelogNormalizeProgram
private static class ChangelogNormalizeContext {
private final StreamPhysicalChangelogNormalize changelogNormalize;
- private final List<RexNode> conditions;
+ private final RexNode conditions;
- public ChangelogNormalizeContext(
- StreamPhysicalChangelogNormalize changelogNormalize,
List<RexNode> conditions) {
+ private ChangelogNormalizeContext(
+ StreamPhysicalChangelogNormalize changelogNormalize, RexNode
conditions) {
this.changelogNormalize = changelogNormalize;
this.conditions = conditions;
}
public static ChangelogNormalizeContext of(
- StreamPhysicalChangelogNormalize changelogNormalize,
List<RexNode> conditions) {
+ StreamPhysicalChangelogNormalize changelogNormalize, RexNode
conditions) {
return new ChangelogNormalizeContext(changelogNormalize,
conditions);
}
@@ -228,7 +265,7 @@ public class FlinkMarkChangelogNormalizeProgram
return changelogNormalize;
}
- public List<RexNode> getConditions() {
+ public RexNode getConditions() {
return conditions;
}
}