This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ff8e3d34811 [fix](unique function) fix push unique function through
join (#58516)
ff8e3d34811 is described below
commit ff8e3d348115dbcb67fc0030bf693115dd09e247
Author: yujun <[email protected]>
AuthorDate: Mon Dec 8 15:22:42 2025 +0800
[fix](unique function) fix push unique function through join (#58516)
For those expression above join, if it contains unique function, then
cann't push through or inside join. For example, for sql `t1 join t2
where random() < 0.5`, suppose t1 contains m rows, t2 contains n rows,
then this sql expect to contains about m * n * 0.5 rows. If we push this
random below join's child, it will rewrite as `(t1 where random() < 0.5)
join (t2 where random() < 0.5)`, then it will expect to contains about
(m*0.5) * (n*0.5) = m * n * 0.25 rows, which is less than the origin
sql.
For join reorder rule, if a filter's child is join, this rule may
extract conjuncts from the filter and push them into the join, then it
will have the problem too, so we need to disable this rule when the
filter contains unique function.
---
.../rules/rewrite/PushDownFilterThroughJoin.java | 7 ++
.../rules/rewrite/PushFilterInsideJoin.java | 23 ++++--
.../doris/nereids/rules/rewrite/ReorderJoin.java | 42 ++++++++++-
...wn_filter_through_join_with_unique_function.out | 67 +++++++++++++++++
...filter_through_join_with_unique_function.groovy | 84 ++++++++++++++++++++++
5 files changed, 217 insertions(+), 6 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughJoin.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughJoin.java
index 6dcf1258ce5..e717a00b3db 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughJoin.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushDownFilterThroughJoin.java
@@ -116,6 +116,10 @@ public class PushDownFilterThroughJoin extends
OneRewriteRuleFactory {
Set<Expression> rightPredicates = Sets.newLinkedHashSet();
Set<Expression> remainingPredicates = Sets.newLinkedHashSet();
for (Expression p : filterPredicates) {
+ if (p.containsUniqueFunction()) {
+ remainingPredicates.add(p);
+ continue;
+ }
Set<Slot> slots = p.collect(SlotReference.class::isInstance);
if (slots.isEmpty()) {
leftPredicates.add(p);
@@ -154,6 +158,9 @@ public class PushDownFilterThroughJoin extends
OneRewriteRuleFactory {
if (!(predicate instanceof EqualTo)) {
return false;
}
+ if (predicate.containsUniqueFunction()) {
+ return false;
+ }
EqualTo equalTo = (EqualTo) predicate;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushFilterInsideJoin.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushFilterInsideJoin.java
index 69d990c3bdb..c1640671d33 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushFilterInsideJoin.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushFilterInsideJoin.java
@@ -26,8 +26,10 @@ import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.nereids.util.PlanUtils;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import java.util.List;
import java.util.Set;
@@ -47,18 +49,31 @@ public class PushFilterInsideJoin extends
OneRewriteRuleFactory {
.when(filter -> filter.child().getJoinType().isCrossJoin()
|| filter.child().getJoinType().isInnerJoin())
.then(filter -> {
- List<Expression> otherConditions =
Lists.newArrayList(filter.getConjuncts());
LogicalJoin<Plan, Plan> join = filter.child();
Set<Slot> childOutput = join.getOutputSet();
- if
(ExpressionUtils.getInputSlotSet(otherConditions).stream()
+ if
(ExpressionUtils.getInputSlotSet(filter.getConjuncts()).stream()
.filter(MarkJoinSlotReference.class::isInstance)
.anyMatch(slot -> childOutput.contains(slot))) {
return null;
}
+ Set<Expression> remainConditions = Sets.newLinkedHashSet();
+ List<Expression> otherConditions =
Lists.newArrayListWithExpectedSize(
+ filter.getConjuncts().size() +
join.getOtherJoinConjuncts().size());
+ for (Expression expr : filter.getConjuncts()) {
+ if (expr.containsUniqueFunction()) {
+ remainConditions.add(expr);
+ } else {
+ otherConditions.add(expr);
+ }
+ }
+ if (otherConditions.isEmpty()) {
+ return null;
+ }
otherConditions.addAll(join.getOtherJoinConjuncts());
- return new LogicalJoin<>(join.getJoinType(),
join.getHashJoinConjuncts(),
+ return PlanUtils.filterOrSelf(remainConditions, new
LogicalJoin<>(
+ join.getJoinType(), join.getHashJoinConjuncts(),
otherConditions, join.getDistributeHint(),
join.getMarkJoinSlotReference(),
- join.children(), join.getJoinReorderContext());
+ join.children(), join.getJoinReorderContext()));
}).toRule(RuleType.PUSH_FILTER_INSIDE_JOIN);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ReorderJoin.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ReorderJoin.java
index 0a0971e3de2..f5bb7323155 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ReorderJoin.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ReorderJoin.java
@@ -40,6 +40,7 @@ import com.google.common.collect.ImmutableList.Builder;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import java.util.LinkedHashSet;
import java.util.List;
@@ -59,6 +60,14 @@ import java.util.stream.Collectors;
*
* output:
* SELECT * FROM t1 JOIN t3 ON t1.id=t3.id JOIN t2 ON t2.id=t3.id
+ *
+ * NOTICE:
+ * ReorderJoin will extract all filter's conjuncts together, then reorder the
joins.
+ * But if the conjuncts contains unique function, after reordering, the result
is error,
+ * it may generate more or less expected output rows than the origin
expression.
+ * We handle the top filter with unique function, but skip the below filters
with unique function.
+ * For the below filters contains unique function, it should not reorder their
joins with other joins.
+ *
* </pre>
* </p>
* Using the {@link MultiJoin} to complete this task.
@@ -83,17 +92,38 @@ public class ReorderJoin extends OneRewriteRuleFactory {
|| ((LogicalJoin<?, ?>)
ctx.root.child()).isLeadingJoin()) {
return null;
}
+
LogicalFilter<Plan> filter = ctx.root;
+ Set<Expression> uniqueExprConjuncts = Sets.newHashSet();
+ Set<Expression> nonUniqueExprConjuncts =
Sets.newHashSetWithExpectedSize(filter.getConjuncts().size());
+ for (Expression conjunct : filter.getConjuncts()) {
+ // after reorder and push down the random() down to lower
join,
+ // the rewritten sql may have less rows() than the origin
sql
+ if (conjunct.containsUniqueFunction()) {
+ uniqueExprConjuncts.add(conjunct);
+ } else {
+ nonUniqueExprConjuncts.add(conjunct);
+ }
+ }
+ if (nonUniqueExprConjuncts.isEmpty()) {
+ return null;
+ }
+ LogicalFilter<Plan> nonUniqueExprFilter =
uniqueExprConjuncts.isEmpty()
+ ? filter :
filter.withConjunctsAndChild(nonUniqueExprConjuncts, filter.child());
Map<Plan, DistributeHint> planToHintType = Maps.newHashMap();
- Plan plan = joinToMultiJoin(filter, planToHintType);
+ Plan plan = joinToMultiJoin(nonUniqueExprFilter,
planToHintType);
Preconditions.checkState(plan instanceof MultiJoin, "join to
multi join should return MultiJoin,"
+ " but return plan is " + plan.getType());
MultiJoin multiJoin = (MultiJoin) plan;
ctx.statementContext.addJoinFilters(multiJoin.getJoinFilter());
ctx.statementContext.setMaxNAryInnerJoin(multiJoin.children().size());
Plan after = multiJoinToJoin(multiJoin, planToHintType);
- return after;
+ if (after != null && !after.equals(nonUniqueExprFilter)) {
+ return PlanUtils.filterOrSelf(uniqueExprConjuncts, after);
+ } else {
+ return null;
+ }
}).toRule(RuleType.REORDER_JOIN);
}
@@ -118,6 +148,14 @@ public class ReorderJoin extends OneRewriteRuleFactory {
// Implicit rely on {rule: MergeFilters}, so don't exist
filter--filter--join.
if (plan instanceof LogicalFilter) {
LogicalFilter<?> filter = (LogicalFilter<?>) plan;
+ for (Expression conjunct : filter.getConjuncts()) {
+ // (t1 join t2) join t3 where t1.a = t3.x + random()
+ // if reorder, then may have ((t1 join t3) on t1.a = t3.x +
random()) join t2,
+ // then the reorder result will less rows than origin.
+ if (conjunct.containsUniqueFunction()) {
+ return plan;
+ }
+ }
joinFilter.addAll(filter.getConjuncts());
join = (LogicalJoin<?, ?>) filter.child();
} else {
diff --git
a/regression-test/data/nereids_rules_p0/unique_function/push_down_filter_through_join_with_unique_function.out
b/regression-test/data/nereids_rules_p0/unique_function/push_down_filter_through_join_with_unique_function.out
new file mode 100644
index 00000000000..efd13676d65
--- /dev/null
+++
b/regression-test/data/nereids_rules_p0/unique_function/push_down_filter_through_join_with_unique_function.out
@@ -0,0 +1,67 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !push_down_filter_through_join_1 --
+PhysicalResultSink
+--filter((random() > 0.1))
+----NestedLoopJoin[CROSS_JOIN]
+------PhysicalProject[t1.id]
+--------PhysicalOlapScan[t1]
+------PhysicalProject[t2.id]
+--------PhysicalOlapScan[t2]
+
+-- !push_down_filter_through_join_2 --
+PhysicalResultSink
+--PhysicalProject[t1.id, t2.id]
+----filter((random() > 0.1))
+------hashJoin[INNER_JOIN] hashCondition=((expr_cast(id as BIGINT) =
expr_(cast(id as BIGINT) + 10))) otherCondition=()
+--------PhysicalProject[cast(id as BIGINT) AS `expr_cast(id as BIGINT)`, t1.id]
+----------filter((t1.id > 100))
+------------PhysicalOlapScan[t1]
+--------PhysicalProject[(cast(id as BIGINT) + 10) AS `expr_(cast(id as BIGINT)
+ 10)`, t2.id]
+----------PhysicalOlapScan[t2]
+
+-- !push_down_filter_through_join_3 --
+PhysicalResultSink
+--filter(((cast(id as BIGINT) + random(1, 100)) > 100))
+----NestedLoopJoin[CROSS_JOIN]
+------PhysicalProject[t1.id]
+--------PhysicalOlapScan[t1]
+------PhysicalProject[t2.id]
+--------PhysicalOlapScan[t2]
+
+-- !reorder_join_1 --
+PhysicalResultSink
+--PhysicalProject[t1.id, t2.id]
+----filter(((cast(id as BIGINT) + random(1, 100)) = cast(id as BIGINT)))
+------hashJoin[INNER_JOIN] hashCondition=((expr_(cast(id as BIGINT) * 2) =
expr_(cast(id as BIGINT) * 5))) otherCondition=()
+--------PhysicalProject[(cast(id as BIGINT) * 2) AS `expr_(cast(id as BIGINT)
* 2)`, t1.id]
+----------PhysicalOlapScan[t1]
+--------PhysicalProject[(cast(id as BIGINT) * 5) AS `expr_(cast(id as BIGINT)
* 5)`, t2.id]
+----------PhysicalOlapScan[t2]
+
+-- !reorder_join_2 --
+PhysicalResultSink
+--filter(((cast(id as BIGINT) + random(1, 100)) = cast(id as BIGINT)))
+----NestedLoopJoin[CROSS_JOIN]
+------PhysicalProject[t1.id, t2.id]
+--------hashJoin[INNER_JOIN] hashCondition=((expr_(cast(id as BIGINT) * 2) =
expr_(cast(id as BIGINT) * 5))) otherCondition=()
+----------PhysicalProject[(cast(id as BIGINT) * 2) AS `expr_(cast(id as
BIGINT) * 2)`, t1.id]
+------------PhysicalOlapScan[t1]
+----------PhysicalProject[(cast(id as BIGINT) * 5) AS `expr_(cast(id as
BIGINT) * 5)`, t2.id]
+------------PhysicalOlapScan[t2]
+------PhysicalProject[t3.id]
+--------PhysicalOlapScan[t2(t3)]
+
+-- !reorder_join_3 --
+PhysicalResultSink
+--PhysicalProject[t1.id, t2.id, t3.id]
+----filter((random() > 10.0))
+------hashJoin[INNER_JOIN] hashCondition=((expr_(cast(id as BIGINT) * 2) =
expr_(cast(id as BIGINT) * 5))) otherCondition=()
+--------PhysicalProject[(cast(id as BIGINT) * 2) AS `expr_(cast(id as BIGINT)
* 2)`, t1.id, t2.id]
+----------NestedLoopJoin[CROSS_JOIN]
+------------PhysicalProject[t1.id]
+--------------PhysicalOlapScan[t1]
+------------PhysicalProject[t2.id]
+--------------PhysicalOlapScan[t2]
+--------PhysicalProject[(cast(id as BIGINT) * 5) AS `expr_(cast(id as BIGINT)
* 5)`, t3.id]
+----------PhysicalOlapScan[t2(t3)]
+
diff --git
a/regression-test/suites/nereids_rules_p0/unique_function/push_down_filter_through_join_with_unique_function.groovy
b/regression-test/suites/nereids_rules_p0/unique_function/push_down_filter_through_join_with_unique_function.groovy
new file mode 100644
index 00000000000..1afbef840d2
--- /dev/null
+++
b/regression-test/suites/nereids_rules_p0/unique_function/push_down_filter_through_join_with_unique_function.groovy
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite('push_down_filter_through_join_with_unique_function') {
+ sql 'SET enable_nereids_planner=true'
+ sql 'SET runtime_filter_mode=OFF'
+ sql 'SET disable_join_reorder=true'
+ sql 'SET enable_fallback_to_original_planner=false'
+ sql "SET ignore_shape_nodes='PhysicalDistribute'"
+ sql "SET detail_shape_nodes='PhysicalProject'"
+ sql 'SET disable_nereids_rules=PRUNE_EMPTY_PARTITION'
+
+ qt_push_down_filter_through_join_1 '''
+ explain shape plan
+ select t1.id, t2.id
+ from t1 join t2
+ where rand() > 0.1
+ '''
+
+ qt_push_down_filter_through_join_2 '''
+ explain shape plan
+ select t1.id, t2.id
+ from t1 join t2
+ where rand() > 0.1 and t1.id = t2.id + 10 and t1.id > 100
+ '''
+
+ qt_push_down_filter_through_join_3 '''
+ explain shape plan
+ select t1.id, t2.id
+ from t1 join t2
+ where t1.id + rand(1, 100) > 100
+ '''
+
+ // sql 'SET disable_join_reorder=false'
+ // Well the reorder join need disable_join_reorder=false,
+ // But if we enable this var, the test is not stable,
+ // the p0 test pipeline may change the two table join order sometimes.
+
+ qt_reorder_join_1 '''
+ explain shape plan
+ select t1.id, t2.id
+ from t1 join t2
+ where t1.id + rand(1, 100) = t2.id and t1.id * 2 = t2.id * 5
+ '''
+
+ qt_reorder_join_2 '''
+ explain shape plan
+ select t1.id, t2.id, t3.id
+ from t1, t2, t2 as t3
+ where t1.id + rand(1, 100) = t3.id and t1.id * 2 = t2.id * 5
+ '''
+
+ // if set disable_join_reorder=false,
+ // | PhysicalResultSink
+ // --filter((random() > 10.0))
|
+ // ----NestedLoopJoin[CROSS_JOIN]
| |
------PhysicalProject
|
+ // --------hashJoin[INNER_JOIN broadcast] hashCondition=((expr_(cast(id
as BIGINT) * 2) = expr_(cast(id as BIGINT) * 5))) otherCondition=() |
+ // ----------PhysicalProject
|
+ // ------------PhysicalOlapScan[t1]
|
+ // ----------PhysicalProject
|
+ // ------------PhysicalOlapScan[t2(t3)]
|
+ // ------PhysicalProject
|
+ // --------PhysicalOlapScan[t2]
|
+ qt_reorder_join_3 '''
+ explain shape plan
+ select t1.id, t2.id, t3.id
+ from t1, t2, t2 as t3
+ where random() > 10 and t1.id * 2 = t3.id * 5
+ '''
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]