This is an automated email from the ASF dual-hosted git repository.
englefly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 037ff2d5a6 [fix](nereids) bug: runtimefilter should not be pushed
through window and topN (#24439)
037ff2d5a6 is described below
commit 037ff2d5a601fdd844a6638912de04fdd58ace9f
Author: minghong <[email protected]>
AuthorDate: Tue Sep 19 18:18:06 2023 +0800
[fix](nereids) bug: runtimefilter should not be pushed through window and
topN (#24439)
runtime filter should not push down through topN
runtime filter should not push down through window if target slot is not
partition key of all windowExpressions
---
.../processor/post/RuntimeFilterGenerator.java | 22 ++++++++++++++
.../doris/nereids/trees/plans/algebra/Window.java | 34 ++++++++++++++++++++++
.../nereids/trees/plans/logical/LogicalWindow.java | 33 ---------------------
.../nereids/postprocess/RuntimeFilterTest.java | 26 ++++++++++++++---
4 files changed, 78 insertions(+), 37 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
index baa9d928b4..6372657378 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
@@ -47,6 +47,8 @@ import
org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow;
import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.nereids.util.JoinUtils;
@@ -514,6 +516,26 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
}
}
+ @Override
+ public PhysicalPlan visitPhysicalTopN(PhysicalTopN<? extends Plan> topN,
CascadesContext context) {
+ topN.child().accept(this, context);
+ PhysicalPlan child = (PhysicalPlan) topN.child();
+ for (Slot slot : child.getOutput()) {
+
context.getRuntimeFilterContext().getAliasTransferMap().remove(slot);
+ }
+ return topN;
+ }
+
+ @Override
+ public PhysicalPlan visitPhysicalWindow(PhysicalWindow<? extends Plan>
window, CascadesContext context) {
+ window.child().accept(this, context);
+ Set<SlotReference> commonPartitionKeys =
window.getCommonPartitionKeyFromWindowExpressions();
+ window.child().getOutput().stream().filter(slot ->
!commonPartitionKeys.contains(slot)).forEach(
+ slot ->
context.getRuntimeFilterContext().getAliasTransferMap().remove(slot)
+ );
+ return window;
+ }
+
/**
* Check runtime filter push down pre-conditions, such as builder side
join type, etc.
*/
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Window.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Window.java
index 35f6547b8a..00d290940e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Window.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Window.java
@@ -22,16 +22,24 @@ import org.apache.doris.analysis.Expr;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.translator.ExpressionTranslator;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.WindowExpression;
import org.apache.doris.nereids.trees.expressions.WindowFrame;
import org.apache.doris.nereids.trees.expressions.WindowFrame.FrameBoundType;
import org.apache.doris.nereids.trees.expressions.WindowFrame.FrameBoundary;
import org.apache.doris.nereids.trees.expressions.WindowFrame.FrameUnitsType;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
import java.math.BigDecimal;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
/**
* interface for LogicalWindow and PhysicalWindow
@@ -86,4 +94,30 @@ public interface Window {
}
}
+ /**
+ *
+ * select rank() over (partition by A, B) as r, sum(x) over(A, C) as s
from T;
+ * A is a common partition key for all windowExpressions.
+ * for a common Partition key A, we could push filter A=1 through this
window.
+ */
+ default Set<SlotReference> getCommonPartitionKeyFromWindowExpressions() {
+ ImmutableSet.Builder<SlotReference> commonPartitionKeySet =
ImmutableSet.builder();
+ Map<Expression, Integer> partitionKeyCount = Maps.newHashMap();
+ for (Expression expr : getWindowExpressions()) {
+ if (expr instanceof Alias && expr.child(0) instanceof
WindowExpression) {
+ WindowExpression winExpr = (WindowExpression) expr.child(0);
+ for (Expression partitionKey : winExpr.getPartitionKeys()) {
+ int count = partitionKeyCount.getOrDefault(partitionKey,
0);
+ partitionKeyCount.put(partitionKey, count + 1);
+ }
+ }
+ }
+ int winExprCount = getWindowExpressions().size();
+ for (Map.Entry<Expression, Integer> entry :
partitionKeyCount.entrySet()) {
+ if (entry.getValue() == winExprCount && entry.getKey() instanceof
SlotReference) {
+ commonPartitionKeySet.add((SlotReference) entry.getKey());
+ }
+ }
+ return commonPartitionKeySet.build();
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalWindow.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalWindow.java
index 67e1819779..fcfe9906d6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalWindow.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalWindow.java
@@ -19,11 +19,9 @@ package org.apache.doris.nereids.trees.plans.logical;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
-import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
-import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.WindowExpression;
import org.apache.doris.nereids.trees.expressions.WindowFrame;
import org.apache.doris.nereids.trees.expressions.functions.window.DenseRank;
@@ -38,14 +36,10 @@ import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import java.util.Optional;
-import java.util.Set;
/**
* logical node to deal with window functions;
@@ -230,31 +224,4 @@ public class LogicalWindow<CHILD_TYPE extends Plan>
extends LogicalUnary<CHILD_T
return Optional.ofNullable(window);
}
-
- /**
- *
- * select rank() over (partition by A, B) as r, sum(x) over(A, C) as s
from T;
- * A is a common partition key for all windowExpressions.
- * for a common Partition key A, we could push filter A=1 through this
window.
- */
- public Set<SlotReference> getCommonPartitionKeyFromWindowExpressions() {
- ImmutableSet.Builder<SlotReference> commonPartitionKeySet =
ImmutableSet.builder();
- Map<Expression, Integer> partitionKeyCount = Maps.newHashMap();
- for (Expression expr : windowExpressions) {
- if (expr instanceof Alias && expr.child(0) instanceof
WindowExpression) {
- WindowExpression winExpr = (WindowExpression) expr.child(0);
- for (Expression partitionKey : winExpr.getPartitionKeys()) {
- int count = partitionKeyCount.getOrDefault(partitionKey,
0);
- partitionKeyCount.put(partitionKey, count + 1);
- }
- }
- }
- int winExprCount = windowExpressions.size();
- for (Map.Entry<Expression, Integer> entry :
partitionKeyCount.entrySet()) {
- if (entry.getValue() == winExprCount && entry.getKey() instanceof
SlotReference) {
- commonPartitionKeySet.add((SlotReference) entry.getKey());
- }
- }
- return commonPartitionKeySet.build();
- }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
index 074a417c10..a2f9b15ac4 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
@@ -317,10 +317,28 @@ public class RuntimeFilterTest extends SSBTestBase {
filter.getSrcExpr().toSql(),
filter.getTargetExprs().stream().collect(Collectors.toSet())
));
- // Set<String> targets =
srcTargets.get(filter.getSrcExpr().toSql());
- // Assertions.assertNotNull(targets);
- // targets.containsAll(
- // filter.getTargetExprs().stream().map(expr ->
expr.toSql()).collect(Collectors.toList()));
}
}
+
+ @Test
+ public void testRuntimeFilterBlockByWindow() {
+ String sql = "SELECT * FROM (select rank() over(partition by
lo_partkey), lo_custkey from lineorder) t JOIN customer on lo_custkey =
c_custkey";
+ List<RuntimeFilter> filters = getRuntimeFilters(sql).get();
+ Assertions.assertEquals(0, filters.size());
+ }
+
+ @Test
+ public void testRuntimeFilterNotBlockByWindow() {
+ String sql = "SELECT * FROM (select rank() over(partition by
lo_custkey), lo_custkey from lineorder) t JOIN customer on lo_custkey =
c_custkey";
+ List<RuntimeFilter> filters = getRuntimeFilters(sql).get();
+ Assertions.assertEquals(1, filters.size());
+ }
+
+ @Test
+ public void testRuntimeFilterBlockByTopN() {
+ String sql = "SELECT * FROM (select lo_custkey from lineorder order by
lo_custkey limit 10) t JOIN customer on lo_custkey = c_custkey";
+ List<RuntimeFilter> filters = getRuntimeFilters(sql).get();
+ Assertions.assertEquals(0, filters.size());
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]