This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5f591a6d12 [opt](nereids) generate in-bloom filter if target is local
for pipeline mode (#20112)
5f591a6d12 is described below
commit 5f591a6d1234a71b4de4a2487737a86f949236a9
Author: minghong <[email protected]>
AuthorDate: Wed May 31 17:24:38 2023 +0800
[opt](nereids) generate in-bloom filter if target is local for pipeline
mode (#20112)
update in-filter usage in pipeline mode:
1. if the target is local, we use in-bloom filter. Let BE choose in or
bloom according to actual distinctive number
2. set default runtime_filter_max_in_num to 1024
---
.../nereids/processor/post/FragmentProcessor.java | 52 ++++++++++++++++++++++
.../nereids/processor/post/PlanPostProcessors.java | 1 +
.../processor/post/RuntimeFilterContext.java | 5 ++-
.../processor/post/RuntimeFilterGenerator.java | 36 ++++++++++-----
.../doris/nereids/trees/plans/AbstractPlan.java | 1 +
.../trees/plans/physical/PhysicalHashJoin.java | 4 +-
.../trees/plans/physical/PhysicalOlapScan.java | 3 +-
.../java/org/apache/doris/qe/SessionVariable.java | 2 +-
8 files changed, 88 insertions(+), 16 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/FragmentProcessor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/FragmentProcessor.java
new file mode 100644
index 0000000000..ddc0e76067
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/FragmentProcessor.java
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.processor.post;
+
+import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.trees.plans.AbstractPlan;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+
+/**
+ * generate fragment id for nereids physical plan
+ */
+public class FragmentProcessor extends PlanPostProcessor {
+ private int frId = 0;
+
+ public PhysicalDistribute visitPhysicalDistribute(PhysicalDistribute<?
extends Plan> distribute,
+ CascadesContext ctx) {
+ frId++;
+ distribute.child().accept(this, ctx);
+ return distribute;
+ }
+
+ public PhysicalHashJoin visitPhysicalHashJoin(PhysicalHashJoin<? extends
Plan, ? extends Plan> join,
+ CascadesContext ctx) {
+ join.setMutableState(AbstractPlan.FRAGMENT_ID, frId);
+ join.left().accept(this, ctx);
+ join.right().accept(this, ctx);
+ return join;
+ }
+
+ public PhysicalOlapScan visitPhysicalOlapScan(PhysicalOlapScan scan,
CascadesContext ctx) {
+ scan.setMutableState(AbstractPlan.FRAGMENT_ID, frId);
+ return scan;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
index b53d316f1a..4e72b8738d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/PlanPostProcessors.java
@@ -60,6 +60,7 @@ public class PlanPostProcessors {
Builder<PlanPostProcessor> builder = ImmutableList.builder();
builder.add(new MergeProjectPostProcessor());
builder.add(new PushdownFilterThroughProject());
+ builder.add(new FragmentProcessor());
if
(!cascadesContext.getConnectContext().getSessionVariable().getRuntimeFilterMode()
.toUpperCase().equals(TRuntimeFilterMode.OFF.name())) {
builder.add(new RuntimeFilterGenerator());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
index 70b32d50ee..69986cde67 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
@@ -28,6 +28,7 @@ import org.apache.doris.nereids.trees.plans.ObjectId;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
import org.apache.doris.planner.RuntimeFilterId;
@@ -71,7 +72,7 @@ public class RuntimeFilterContext {
// alias -> alias's child, if there's a key that is alias's child, the
key-value will change by this way
// Alias(A) = B, now B -> A in map, and encounter Alias(B) -> C, the kv
will be C -> A.
// you can see disjoint set data structure to learn the processing
detailed.
- private final Map<NamedExpression, Pair<ObjectId, Slot>> aliasTransferMap
= Maps.newHashMap();
+ private final Map<NamedExpression, Pair<PhysicalRelation, Slot>>
aliasTransferMap = Maps.newHashMap();
private final Map<Slot, ScanNode> scanNodeOfLegacyRuntimeFilterTarget =
Maps.newHashMap();
@@ -126,7 +127,7 @@ public class RuntimeFilterContext {
return exprIdToOlapScanNodeSlotRef;
}
- public Map<NamedExpression, Pair<ObjectId, Slot>> getAliasTransferMap() {
+ public Map<NamedExpression, Pair<PhysicalRelation, Slot>>
getAliasTransferMap() {
return aliasTransferMap;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
index c070e75641..bb0272c3a7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
@@ -30,7 +30,6 @@ import org.apache.doris.nereids.trees.expressions.Slot;
import
org.apache.doris.nereids.trees.expressions.functions.scalar.BitmapContains;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.JoinType;
-import org.apache.doris.nereids.trees.plans.ObjectId;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
@@ -44,6 +43,7 @@ import org.apache.doris.planner.RuntimeFilterId;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.thrift.TRuntimeFilterType;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import java.util.Arrays;
@@ -85,7 +85,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor
{
public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan,
? extends Plan> join,
CascadesContext context) {
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
- Map<NamedExpression, Pair<ObjectId, Slot>> aliasTransferMap =
ctx.getAliasTransferMap();
+ Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap =
ctx.getAliasTransferMap();
join.right().accept(this, context);
join.left().accept(this, context);
if (DENIED_JOIN_TYPES.contains(join.getJoinType()) ||
join.isMarkJoin()) {
@@ -105,10 +105,6 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
if (type == TRuntimeFilterType.BITMAP) {
continue;
}
- // in-filter is not friendly to pipeline
- if (type == TRuntimeFilterType.IN_OR_BLOOM &&
ctx.getSessionVariable().enablePipelineEngine()) {
- type = TRuntimeFilterType.BLOOM;
- }
// currently, we can ensure children in the two side are
corresponding to the equal_to's.
// so right maybe an expression and left is a slot
Slot unwrappedSlot = checkTargetChild(equalTo.left());
@@ -118,18 +114,35 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
continue;
}
Slot olapScanSlot =
aliasTransferMap.get(unwrappedSlot).second;
+ PhysicalRelation scan =
aliasTransferMap.get(unwrappedSlot).first;
+ // in-filter is not friendly to pipeline
+ if (type == TRuntimeFilterType.IN_OR_BLOOM
+ && ctx.getSessionVariable().enablePipelineEngine()
+ && hasRemoteTarget(join, scan)) {
+ type = TRuntimeFilterType.BLOOM;
+ }
+
long buildSideNdv = getBuildSideNdv(join, equalTo);
RuntimeFilter filter = new
RuntimeFilter(generator.getNextId(),
equalTo.right(), olapScanSlot, type, i, join,
buildSideNdv);
ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(),
filter);
-
ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first,
olapScanSlot);
+
ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first.getId(),
olapScanSlot);
}
}
}
return join;
}
+ private boolean hasRemoteTarget(AbstractPlan join, AbstractPlan scan) {
+
Preconditions.checkArgument(join.getMutableState(AbstractPlan.FRAGMENT_ID).isPresent(),
+ "cannot find fragment id for Join node");
+
Preconditions.checkArgument(scan.getMutableState(AbstractPlan.FRAGMENT_ID).isPresent(),
+ "cannot find fragment id for scan node");
+ return join.getMutableState(AbstractPlan.FRAGMENT_ID).get()
+ != scan.getMutableState(AbstractPlan.FRAGMENT_ID).get();
+ }
+
private long getBuildSideNdv(PhysicalHashJoin<? extends Plan, ? extends
Plan> join, EqualTo equalTo) {
AbstractPlan right = (AbstractPlan) join.right();
//make ut test friendly
@@ -151,7 +164,7 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
return join;
}
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
- Map<NamedExpression, Pair<ObjectId, Slot>> aliasTransferMap =
ctx.getAliasTransferMap();
+ Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap =
ctx.getAliasTransferMap();
if ((ctx.getSessionVariable().getRuntimeFilterType() &
TRuntimeFilterType.BITMAP.getValue()) == 0) {
//only generate BITMAP filter for nested loop join
@@ -185,7 +198,8 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
bitmapContains.child(1), type, i, join, isNot,
-1L);
ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(),
filter);
-
ctx.setTargetsOnScanNode(aliasTransferMap.get(targetSlot).first, olapScanSlot);
+
ctx.setTargetsOnScanNode(aliasTransferMap.get(targetSlot).first.getId(),
+ olapScanSlot);
join.addBitmapRuntimeFilterCondition(bitmapRuntimeFilterCondition);
}
}
@@ -196,7 +210,7 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
@Override
public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan>
project, CascadesContext context) {
project.child().accept(this, context);
- Map<NamedExpression, Pair<ObjectId, Slot>> aliasTransferMap
+ Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap
= context.getRuntimeFilterContext().getAliasTransferMap();
// change key when encounter alias.
for (Expression expression : project.getProjects()) {
@@ -218,7 +232,7 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
public PhysicalRelation visitPhysicalScan(PhysicalRelation scan,
CascadesContext context) {
// add all the slots in map.
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
- scan.getOutput().forEach(slot -> ctx.getAliasTransferMap().put(slot,
Pair.of(scan.getId(), slot)));
+ scan.getOutput().forEach(slot -> ctx.getAliasTransferMap().put(slot,
Pair.of(scan, slot)));
return scan;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java
index a273daf114..f3d545c8ea 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java
@@ -52,6 +52,7 @@ import javax.annotation.Nullable;
* Abstract class for all concrete plan node.
*/
public abstract class AbstractPlan extends AbstractTreeNode<Plan> implements
Plan {
+ public static final String FRAGMENT_ID = "fragment";
private static final EventProducer PLAN_CONSTRUCT_TRACER = new
EventProducer(CounterEvent.class,
EventChannel.getDefaultChannel()
.addEnhancers(new AddCounterEventEnhancer())
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
index 013a8818a9..7a75383868 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHashJoin.java
@@ -24,6 +24,7 @@ import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.MarkJoinSlotReference;
+import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.JoinHint;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
@@ -132,7 +133,8 @@ public class PhysicalHashJoin<
List<Object> args = Lists.newArrayList("type", joinType,
"hashJoinCondition", hashJoinConjuncts,
"otherJoinCondition", otherJoinConjuncts,
- "stats", statistics);
+ "stats", statistics,
+ "fr", getMutableState(AbstractPlan.FRAGMENT_ID));
if (markJoinSlotReference.isPresent()) {
args.add("isMarkJoin");
args.add("true");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java
index fc484bf6c6..ab5f74550b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java
@@ -23,6 +23,7 @@ import org.apache.doris.nereids.properties.DistributionSpec;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.ObjectId;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.PreAggStatus;
@@ -119,7 +120,7 @@ public class PhysicalOlapScan extends PhysicalRelation
implements OlapScan {
public String toString() {
return Utils.toSqlString("PhysicalOlapScan[" + id.asInt() + "]" +
getGroupIdAsString(),
"qualified", Utils.qualifiedName(qualifier,
olapTable.getName()),
- "stats", statistics
+ "stats", statistics, "fr",
getMutableState(AbstractPlan.FRAGMENT_ID)
);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 00ed7a501d..f26e5fc47e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -639,7 +639,7 @@ public class SessionVariable implements Serializable,
Writable {
private int runtimeFilterType = 8;
@VariableMgr.VarAttr(name = RUNTIME_FILTER_MAX_IN_NUM)
- private int runtimeFilterMaxInNum = 102400;
+ private int runtimeFilterMaxInNum = 1024;
@VariableMgr.VarAttr(name = USE_RF_DEFAULT)
public boolean useRuntimeFilterDefaultSize = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]