This is an automated email from the ASF dual-hosted git repository.

englefly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 037ff2d5a6 [fix](nereids) bug: runtimefilter should not be pushed 
through window and topN (#24439)
037ff2d5a6 is described below

commit 037ff2d5a601fdd844a6638912de04fdd58ace9f
Author: minghong <[email protected]>
AuthorDate: Tue Sep 19 18:18:06 2023 +0800

    [fix](nereids) bug: runtimefilter should not be pushed through window and 
topN (#24439)
    
    runtime filter should not push down through topN
    runtime filter should not push down through window if target slot is not 
partition key of all windowExpressions
---
 .../processor/post/RuntimeFilterGenerator.java     | 22 ++++++++++++++
 .../doris/nereids/trees/plans/algebra/Window.java  | 34 ++++++++++++++++++++++
 .../nereids/trees/plans/logical/LogicalWindow.java | 33 ---------------------
 .../nereids/postprocess/RuntimeFilterTest.java     | 26 ++++++++++++++---
 4 files changed, 78 insertions(+), 37 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
index baa9d928b4..6372657378 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
@@ -47,6 +47,8 @@ import 
org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow;
 import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
 import org.apache.doris.nereids.util.ExpressionUtils;
 import org.apache.doris.nereids.util.JoinUtils;
@@ -514,6 +516,26 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
         }
     }
 
+    @Override
+    public PhysicalPlan visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, 
CascadesContext context) {
+        topN.child().accept(this, context);
+        PhysicalPlan child = (PhysicalPlan) topN.child();
+        for (Slot slot : child.getOutput()) {
+            
context.getRuntimeFilterContext().getAliasTransferMap().remove(slot);
+        }
+        return topN;
+    }
+
+    @Override
+    public PhysicalPlan visitPhysicalWindow(PhysicalWindow<? extends Plan> 
window, CascadesContext context) {
+        window.child().accept(this, context);
+        Set<SlotReference> commonPartitionKeys = 
window.getCommonPartitionKeyFromWindowExpressions();
+        window.child().getOutput().stream().filter(slot -> 
!commonPartitionKeys.contains(slot)).forEach(
+                slot -> 
context.getRuntimeFilterContext().getAliasTransferMap().remove(slot)
+        );
+        return window;
+    }
+
     /**
      * Check runtime filter push down pre-conditions, such as builder side 
join type, etc.
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Window.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Window.java
index 35f6547b8a..00d290940e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Window.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Window.java
@@ -22,16 +22,24 @@ import org.apache.doris.analysis.Expr;
 import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.nereids.glue.translator.ExpressionTranslator;
 import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.nereids.trees.expressions.Alias;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.WindowExpression;
 import org.apache.doris.nereids.trees.expressions.WindowFrame;
 import org.apache.doris.nereids.trees.expressions.WindowFrame.FrameBoundType;
 import org.apache.doris.nereids.trees.expressions.WindowFrame.FrameBoundary;
 import org.apache.doris.nereids.trees.expressions.WindowFrame.FrameUnitsType;
 import org.apache.doris.nereids.trees.expressions.literal.Literal;
 
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
 import java.math.BigDecimal;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * interface for LogicalWindow and PhysicalWindow
@@ -86,4 +94,30 @@ public interface Window {
         }
     }
 
+    /**
+     *
+     * select rank() over (partition by A, B) as r, sum(x) over(A, C) as s 
from T;
+     * A is a common partition key for all windowExpressions.
+     * for a common Partition key A, we could push filter A=1 through this 
window.
+     */
+    default Set<SlotReference> getCommonPartitionKeyFromWindowExpressions() {
+        ImmutableSet.Builder<SlotReference> commonPartitionKeySet = 
ImmutableSet.builder();
+        Map<Expression, Integer> partitionKeyCount = Maps.newHashMap();
+        for (Expression expr : getWindowExpressions()) {
+            if (expr instanceof Alias && expr.child(0) instanceof 
WindowExpression) {
+                WindowExpression winExpr = (WindowExpression) expr.child(0);
+                for (Expression partitionKey : winExpr.getPartitionKeys()) {
+                    int count = partitionKeyCount.getOrDefault(partitionKey, 
0);
+                    partitionKeyCount.put(partitionKey, count + 1);
+                }
+            }
+        }
+        int winExprCount = getWindowExpressions().size();
+        for (Map.Entry<Expression, Integer> entry : 
partitionKeyCount.entrySet()) {
+            if (entry.getValue() == winExprCount && entry.getKey() instanceof 
SlotReference) {
+                commonPartitionKeySet.add((SlotReference) entry.getKey());
+            }
+        }
+        return commonPartitionKeySet.build();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalWindow.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalWindow.java
index 67e1819779..fcfe9906d6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalWindow.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalWindow.java
@@ -19,11 +19,9 @@ package org.apache.doris.nereids.trees.plans.logical;
 
 import org.apache.doris.nereids.memo.GroupExpression;
 import org.apache.doris.nereids.properties.LogicalProperties;
-import org.apache.doris.nereids.trees.expressions.Alias;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.NamedExpression;
 import org.apache.doris.nereids.trees.expressions.Slot;
-import org.apache.doris.nereids.trees.expressions.SlotReference;
 import org.apache.doris.nereids.trees.expressions.WindowExpression;
 import org.apache.doris.nereids.trees.expressions.WindowFrame;
 import org.apache.doris.nereids.trees.expressions.functions.window.DenseRank;
@@ -38,14 +36,10 @@ import org.apache.doris.qe.ConnectContext;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
 
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Set;
 
 /**
  * logical node to deal with window functions;
@@ -230,31 +224,4 @@ public class LogicalWindow<CHILD_TYPE extends Plan> 
extends LogicalUnary<CHILD_T
 
         return Optional.ofNullable(window);
     }
-
-    /**
-     *
-     * select rank() over (partition by A, B) as r, sum(x) over(A, C) as s 
from T;
-     * A is a common partition key for all windowExpressions.
-     * for a common Partition key A, we could push filter A=1 through this 
window.
-     */
-    public Set<SlotReference> getCommonPartitionKeyFromWindowExpressions() {
-        ImmutableSet.Builder<SlotReference> commonPartitionKeySet = 
ImmutableSet.builder();
-        Map<Expression, Integer> partitionKeyCount = Maps.newHashMap();
-        for (Expression expr : windowExpressions) {
-            if (expr instanceof Alias && expr.child(0) instanceof 
WindowExpression) {
-                WindowExpression winExpr = (WindowExpression) expr.child(0);
-                for (Expression partitionKey : winExpr.getPartitionKeys()) {
-                    int count = partitionKeyCount.getOrDefault(partitionKey, 
0);
-                    partitionKeyCount.put(partitionKey, count + 1);
-                }
-            }
-        }
-        int winExprCount = windowExpressions.size();
-        for (Map.Entry<Expression, Integer> entry : 
partitionKeyCount.entrySet()) {
-            if (entry.getValue() == winExprCount && entry.getKey() instanceof 
SlotReference) {
-                commonPartitionKeySet.add((SlotReference) entry.getKey());
-            }
-        }
-        return commonPartitionKeySet.build();
-    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
index 074a417c10..a2f9b15ac4 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
@@ -317,10 +317,28 @@ public class RuntimeFilterTest extends SSBTestBase {
                     filter.getSrcExpr().toSql(),
                     
filter.getTargetExprs().stream().collect(Collectors.toSet())
             ));
-            // Set<String> targets = 
srcTargets.get(filter.getSrcExpr().toSql());
-            // Assertions.assertNotNull(targets);
-            // targets.containsAll(
-            //         filter.getTargetExprs().stream().map(expr -> 
expr.toSql()).collect(Collectors.toList()));
         }
     }
+
+    @Test
+    public void testRuntimeFilterBlockByWindow() {
+        String sql = "SELECT * FROM (select rank() over(partition by 
lo_partkey), lo_custkey from lineorder) t JOIN customer on lo_custkey = 
c_custkey";
+        List<RuntimeFilter> filters = getRuntimeFilters(sql).get();
+        Assertions.assertEquals(0, filters.size());
+    }
+
+    @Test
+    public void testRuntimeFilterNotBlockByWindow() {
+        String sql = "SELECT * FROM (select rank() over(partition by 
lo_custkey), lo_custkey from lineorder) t JOIN customer on lo_custkey = 
c_custkey";
+        List<RuntimeFilter> filters = getRuntimeFilters(sql).get();
+        Assertions.assertEquals(1, filters.size());
+    }
+
+    @Test
+    public void testRuntimeFilterBlockByTopN() {
+        String sql = "SELECT * FROM (select lo_custkey from lineorder order by 
lo_custkey limit 10) t JOIN customer on lo_custkey = c_custkey";
+        List<RuntimeFilter> filters = getRuntimeFilters(sql).get();
+        Assertions.assertEquals(0, filters.size());
+    }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to