This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f591a6d12 [opt](nereids) generate in-bloom filter if target is local 
for pipeline mode (#20112)
5f591a6d12 is described below

commit 5f591a6d1234a71b4de4a2487737a86f949236a9
Author: minghong <[email protected]>
AuthorDate: Wed May 31 17:24:38 2023 +0800

    [opt](nereids) generate in-bloom filter if target is local for pipeline 
mode (#20112)
    
    update in-filter usage in pipeline mode:
    1. if the target is local, we use in-bloom filter. Let BE choose in or 
bloom according to actual distinctive number
    2. set default runtime_filter_max_in_num to 1024
---
 .../nereids/processor/post/FragmentProcessor.java  | 52 ++++++++++++++++++++++
 .../nereids/processor/post/PlanPostProcessors.java |  1 +
 .../processor/post/RuntimeFilterContext.java       |  5 ++-
 .../processor/post/RuntimeFilterGenerator.java     | 36 ++++++++++-----
 .../doris/nereids/trees/plans/AbstractPlan.java    |  1 +
 .../trees/plans/physical/PhysicalHashJoin.java     |  4 +-
 .../trees/plans/physical/PhysicalOlapScan.java     |  3 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |  2 +-
 8 files changed, 88 insertions(+), 16 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/FragmentProcessor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/FragmentProcessor.java
new file mode 100644
index 0000000000..ddc0e76067
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/FragmentProcessor.java
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.processor.post;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.trees.plans.AbstractPlan;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+
+/**
+ * generate fragment id for nereids physical plan
+ */
+public class FragmentProcessor extends PlanPostProcessor {
+    private int frId = 0;
+
+    public PhysicalDistribute visitPhysicalDistribute(PhysicalDistribute<? 
extends Plan> distribute,
+            CascadesContext ctx) {
+        frId++;
+        distribute.child().accept(this, ctx);
+        return distribute;
+    }
+
+    public PhysicalHashJoin visitPhysicalHashJoin(PhysicalHashJoin<? extends 
Plan, ? extends Plan> join,
+            CascadesContext ctx) {
+        join.setMutableState(AbstractPlan.FRAGMENT_ID, frId);
+        join.left().accept(this, ctx);
+        join.right().accept(this, ctx);
+        return join;
+    }
+
+    public PhysicalOlapScan visitPhysicalOlapScan(PhysicalOlapScan scan, 
CascadesContext ctx) {
+        scan.setMutableState(AbstractPlan.FRAGMENT_ID, frId);
+        return scan;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
index b53d316f1a..4e72b8738d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
@@ -60,6 +60,7 @@ public class PlanPostProcessors {
         Builder<PlanPostProcessor> builder = ImmutableList.builder();
         builder.add(new MergeProjectPostProcessor());
         builder.add(new PushdownFilterThroughProject());
+        builder.add(new FragmentProcessor());
         if 
(!cascadesContext.getConnectContext().getSessionVariable().getRuntimeFilterMode()
                         .toUpperCase().equals(TRuntimeFilterMode.OFF.name())) {
             builder.add(new RuntimeFilterGenerator());
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
index 70b32d50ee..69986cde67 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
@@ -28,6 +28,7 @@ import org.apache.doris.nereids.trees.plans.ObjectId;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
 import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
 import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
 import org.apache.doris.planner.RuntimeFilterId;
@@ -71,7 +72,7 @@ public class RuntimeFilterContext {
     // alias -> alias's child, if there's a key that is alias's child, the 
key-value will change by this way
     // Alias(A) = B, now B -> A in map, and encounter Alias(B) -> C, the kv 
will be C -> A.
     // you can see disjoint set data structure to learn the processing 
detailed.
-    private final Map<NamedExpression, Pair<ObjectId, Slot>> aliasTransferMap 
= Maps.newHashMap();
+    private final Map<NamedExpression, Pair<PhysicalRelation, Slot>> 
aliasTransferMap = Maps.newHashMap();
 
     private final Map<Slot, ScanNode> scanNodeOfLegacyRuntimeFilterTarget = 
Maps.newHashMap();
 
@@ -126,7 +127,7 @@ public class RuntimeFilterContext {
         return exprIdToOlapScanNodeSlotRef;
     }
 
-    public Map<NamedExpression, Pair<ObjectId, Slot>> getAliasTransferMap() {
+    public Map<NamedExpression, Pair<PhysicalRelation, Slot>> 
getAliasTransferMap() {
         return aliasTransferMap;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
index c070e75641..bb0272c3a7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
@@ -30,7 +30,6 @@ import org.apache.doris.nereids.trees.expressions.Slot;
 import 
org.apache.doris.nereids.trees.expressions.functions.scalar.BitmapContains;
 import org.apache.doris.nereids.trees.plans.AbstractPlan;
 import org.apache.doris.nereids.trees.plans.JoinType;
-import org.apache.doris.nereids.trees.plans.ObjectId;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
@@ -44,6 +43,7 @@ import org.apache.doris.planner.RuntimeFilterId;
 import org.apache.doris.statistics.ColumnStatistic;
 import org.apache.doris.thrift.TRuntimeFilterType;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 
 import java.util.Arrays;
@@ -85,7 +85,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor 
{
     public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, 
? extends Plan> join,
             CascadesContext context) {
         RuntimeFilterContext ctx = context.getRuntimeFilterContext();
-        Map<NamedExpression, Pair<ObjectId, Slot>> aliasTransferMap = 
ctx.getAliasTransferMap();
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = 
ctx.getAliasTransferMap();
         join.right().accept(this, context);
         join.left().accept(this, context);
         if (DENIED_JOIN_TYPES.contains(join.getJoinType()) || 
join.isMarkJoin()) {
@@ -105,10 +105,6 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
                     if (type == TRuntimeFilterType.BITMAP) {
                         continue;
                     }
-                    // in-filter is not friendly to pipeline
-                    if (type == TRuntimeFilterType.IN_OR_BLOOM && 
ctx.getSessionVariable().enablePipelineEngine()) {
-                        type = TRuntimeFilterType.BLOOM;
-                    }
                     // currently, we can ensure children in the two side are 
corresponding to the equal_to's.
                     // so right maybe an expression and left is a slot
                     Slot unwrappedSlot = checkTargetChild(equalTo.left());
@@ -118,18 +114,35 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
                         continue;
                     }
                     Slot olapScanSlot = 
aliasTransferMap.get(unwrappedSlot).second;
+                    PhysicalRelation scan = 
aliasTransferMap.get(unwrappedSlot).first;
+                    // in-filter is not friendly to pipeline
+                    if (type == TRuntimeFilterType.IN_OR_BLOOM
+                            && ctx.getSessionVariable().enablePipelineEngine()
+                            && hasRemoteTarget(join, scan)) {
+                        type = TRuntimeFilterType.BLOOM;
+                    }
+
                     long buildSideNdv = getBuildSideNdv(join, equalTo);
                     RuntimeFilter filter = new 
RuntimeFilter(generator.getNextId(),
                             equalTo.right(), olapScanSlot, type, i, join, 
buildSideNdv);
                     ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
                     ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), 
filter);
-                    
ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first, 
olapScanSlot);
+                    
ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first.getId(), 
olapScanSlot);
                 }
             }
         }
         return join;
     }
 
+    private boolean hasRemoteTarget(AbstractPlan join, AbstractPlan scan) {
+        
Preconditions.checkArgument(join.getMutableState(AbstractPlan.FRAGMENT_ID).isPresent(),
+                "cannot find fragment id for Join node");
+        
Preconditions.checkArgument(scan.getMutableState(AbstractPlan.FRAGMENT_ID).isPresent(),
+                "cannot find fragment id for scan node");
+        return join.getMutableState(AbstractPlan.FRAGMENT_ID).get()
+                != scan.getMutableState(AbstractPlan.FRAGMENT_ID).get();
+    }
+
     private long getBuildSideNdv(PhysicalHashJoin<? extends Plan, ? extends 
Plan> join, EqualTo equalTo) {
         AbstractPlan right = (AbstractPlan) join.right();
         //make ut test friendly
@@ -151,7 +164,7 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
             return join;
         }
         RuntimeFilterContext ctx = context.getRuntimeFilterContext();
-        Map<NamedExpression, Pair<ObjectId, Slot>> aliasTransferMap = 
ctx.getAliasTransferMap();
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = 
ctx.getAliasTransferMap();
 
         if ((ctx.getSessionVariable().getRuntimeFilterType() & 
TRuntimeFilterType.BITMAP.getValue()) == 0) {
             //only generate BITMAP filter for nested loop join
@@ -185,7 +198,8 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
                             bitmapContains.child(1), type, i, join, isNot, 
-1L);
                     ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
                     ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), 
filter);
-                    
ctx.setTargetsOnScanNode(aliasTransferMap.get(targetSlot).first, olapScanSlot);
+                    
ctx.setTargetsOnScanNode(aliasTransferMap.get(targetSlot).first.getId(),
+                            olapScanSlot);
                     
join.addBitmapRuntimeFilterCondition(bitmapRuntimeFilterCondition);
                 }
             }
@@ -196,7 +210,7 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
     @Override
     public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan> 
project, CascadesContext context) {
         project.child().accept(this, context);
-        Map<NamedExpression, Pair<ObjectId, Slot>> aliasTransferMap
+        Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap
                 = context.getRuntimeFilterContext().getAliasTransferMap();
         // change key when encounter alias.
         for (Expression expression : project.getProjects()) {
@@ -218,7 +232,7 @@ public class RuntimeFilterGenerator extends 
PlanPostProcessor {
     public PhysicalRelation visitPhysicalScan(PhysicalRelation scan, 
CascadesContext context) {
         // add all the slots in map.
         RuntimeFilterContext ctx = context.getRuntimeFilterContext();
-        scan.getOutput().forEach(slot -> ctx.getAliasTransferMap().put(slot, 
Pair.of(scan.getId(), slot)));
+        scan.getOutput().forEach(slot -> ctx.getAliasTransferMap().put(slot, 
Pair.of(scan, slot)));
         return scan;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java
index a273daf114..f3d545c8ea 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java
@@ -52,6 +52,7 @@ import javax.annotation.Nullable;
  * Abstract class for all concrete plan node.
  */
 public abstract class AbstractPlan extends AbstractTreeNode<Plan> implements 
Plan {
+    public static final String FRAGMENT_ID = "fragment";
     private static final EventProducer PLAN_CONSTRUCT_TRACER = new 
EventProducer(CounterEvent.class,
             EventChannel.getDefaultChannel()
                     .addEnhancers(new AddCounterEventEnhancer())
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
index 013a8818a9..7a75383868 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
@@ -24,6 +24,7 @@ import org.apache.doris.nereids.properties.PhysicalProperties;
 import org.apache.doris.nereids.trees.expressions.ExprId;
 import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.MarkJoinSlotReference;
+import org.apache.doris.nereids.trees.plans.AbstractPlan;
 import org.apache.doris.nereids.trees.plans.JoinHint;
 import org.apache.doris.nereids.trees.plans.JoinType;
 import org.apache.doris.nereids.trees.plans.Plan;
@@ -132,7 +133,8 @@ public class PhysicalHashJoin<
         List<Object> args = Lists.newArrayList("type", joinType,
                 "hashJoinCondition", hashJoinConjuncts,
                 "otherJoinCondition", otherJoinConjuncts,
-                "stats", statistics);
+                "stats", statistics,
+                "fr", getMutableState(AbstractPlan.FRAGMENT_ID));
         if (markJoinSlotReference.isPresent()) {
             args.add("isMarkJoin");
             args.add("true");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java
index fc484bf6c6..ab5f74550b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java
@@ -23,6 +23,7 @@ import org.apache.doris.nereids.properties.DistributionSpec;
 import org.apache.doris.nereids.properties.LogicalProperties;
 import org.apache.doris.nereids.properties.PhysicalProperties;
 import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.plans.AbstractPlan;
 import org.apache.doris.nereids.trees.plans.ObjectId;
 import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.PreAggStatus;
@@ -119,7 +120,7 @@ public class PhysicalOlapScan extends PhysicalRelation 
implements OlapScan {
     public String toString() {
         return Utils.toSqlString("PhysicalOlapScan[" + id.asInt() + "]" + 
getGroupIdAsString(),
                 "qualified", Utils.qualifiedName(qualifier, 
olapTable.getName()),
-                "stats", statistics
+                "stats", statistics, "fr", 
getMutableState(AbstractPlan.FRAGMENT_ID)
         );
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 00ed7a501d..f26e5fc47e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -639,7 +639,7 @@ public class SessionVariable implements Serializable, 
Writable {
     private int runtimeFilterType = 8;
 
     @VariableMgr.VarAttr(name = RUNTIME_FILTER_MAX_IN_NUM)
-    private int runtimeFilterMaxInNum = 102400;
+    private int runtimeFilterMaxInNum = 1024;
 
     @VariableMgr.VarAttr(name = USE_RF_DEFAULT)
     public boolean useRuntimeFilterDefaultSize = false;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to