This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3117ac9289 [enhancement](Nereids) use post-order to generate runtime
filter in RuntimeFilterGenerator (#13949)
3117ac9289 is described below
commit 3117ac9289fbfc37728f9f022289aa19b77a531d
Author: mch_ucchi <[email protected]>
AuthorDate: Wed Nov 9 14:28:49 2022 +0800
[enhancement](Nereids) use post-order to generate runtime filter in
RuntimeFilterGenerator (#13949)
change runtime filter generator from pre-order to post-order, it maybe
change the quantity of generated runtime filters.
and the ut will be corrected.
---
.../glue/translator/RuntimeFilterTranslator.java | 5 +-
.../processor/post/RuntimeFilterContext.java | 62 ++--------
.../processor/post/RuntimeFilterGenerator.java | 130 ++++++++++-----------
.../trees/plans/physical/RuntimeFilter.java | 44 -------
.../nereids/postprocess/RuntimeFilterTest.java | 7 +-
5 files changed, 78 insertions(+), 170 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
index 1dc7a0a2c1..09dbb4115b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
@@ -60,9 +60,8 @@ public class RuntimeFilterTranslator {
* @param ctx plan translator context
*/
public void translateRuntimeFilterTarget(Slot slot, OlapScanNode node,
PlanTranslatorContext ctx) {
- context.setKVInNormalMap(context.getExprIdToOlapScanNodeSlotRef(),
- slot.getExprId(), ctx.findSlotRef(slot.getExprId()));
-
context.setKVInNormalMap(context.getScanNodeOfLegacyRuntimeFilterTarget(),
slot, node);
+ context.getExprIdToOlapScanNodeSlotRef().put(slot.getExprId(),
ctx.findSlotRef(slot.getExprId()));
+ context.getScanNodeOfLegacyRuntimeFilterTarget().put(slot, node);
}
/**
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
index 5565dcf928..03c7be5f17 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
@@ -19,6 +19,7 @@ package org.apache.doris.nereids.processor.post;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.common.IdGenerator;
+import org.apache.doris.common.Pair;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
@@ -32,12 +33,9 @@ import org.apache.doris.qe.SessionVariable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.jetbrains.annotations.NotNull;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -62,8 +60,10 @@ public class RuntimeFilterContext {
private final Map<PhysicalHashJoin, List<RuntimeFilter>>
runtimeFilterOnHashJoinNode = Maps.newHashMap();
- // Alias's child to itself.
- private final Map<Slot, NamedExpression> aliasChildToSelf =
Maps.newHashMap();
+ // alias -> alias's child, if there's a key that is alias's child, the
key-value will change by this way
+ // Alias(A) = B, now B -> A in map, and encounter Alias(B) -> C, the kv
will be C -> A.
+ // you can see disjoint set data structure to learn the processing
detailed.
+ private final Map<NamedExpression, Pair<RelationId, NamedExpression>>
aliasTransferMap = Maps.newHashMap();
private final Map<Slot, OlapScanNode> scanNodeOfLegacyRuntimeFilterTarget
= Maps.newHashMap();
@@ -86,36 +86,21 @@ public class RuntimeFilterContext {
return limits;
}
- public void setTargetExprIdToFilters(ExprId id, RuntimeFilter... filters) {
- Preconditions.checkArgument(Arrays.stream(filters)
- .allMatch(filter -> filter.getTargetExpr().getExprId() == id));
- this.targetExprIdToFilter.computeIfAbsent(id, k ->
Lists.newArrayList())
- .addAll(Arrays.asList(filters));
+ public void setTargetExprIdToFilter(ExprId id, RuntimeFilter filter) {
+ Preconditions.checkArgument(filter.getTargetExpr().getExprId() == id);
+ this.targetExprIdToFilter.computeIfAbsent(id, k ->
Lists.newArrayList()).add(filter);
}
- public List<RuntimeFilter> getFiltersByTargetExprId(ExprId id) {
- return targetExprIdToFilter.get(id);
- }
-
- public void removeFilters(ExprId id) {
- targetExprIdToFilter.remove(id);
- }
-
- public void setTargetsOnScanNode(RelationId id, Slot... slots) {
- this.targetOnOlapScanNodeMap.computeIfAbsent(id, k ->
Lists.newArrayList())
- .addAll(Arrays.asList(slots));
- }
-
- public <K, V> void setKVInNormalMap(@NotNull Map<K, V> map, K key, V
value) {
- map.put(key, value);
+ public void setTargetsOnScanNode(RelationId id, Slot slot) {
+ this.targetOnOlapScanNodeMap.computeIfAbsent(id, k ->
Lists.newArrayList()).add(slot);
}
public Map<ExprId, SlotRef> getExprIdToOlapScanNodeSlotRef() {
return exprIdToOlapScanNodeSlotRef;
}
- public Map<Slot, NamedExpression> getAliasChildToSelf() {
- return aliasChildToSelf;
+ public Map<NamedExpression, Pair<RelationId, NamedExpression>>
getAliasTransferMap() {
+ return aliasTransferMap;
}
public Map<Slot, OlapScanNode> getScanNodeOfLegacyRuntimeFilterTarget() {
@@ -143,14 +128,6 @@ public class RuntimeFilterContext {
return legacyFilters;
}
- public void setLegacyFilter(org.apache.doris.planner.RuntimeFilter filter)
{
- this.legacyFilters.add(filter);
- }
-
- public <K, V> boolean checkExistKey(@NotNull Map<K, V> map, K key) {
- return map.containsKey(key);
- }
-
/**
* get nereids runtime filters
* @return nereids runtime filters
@@ -166,21 +143,6 @@ public class RuntimeFilterContext {
return filters;
}
- /**
- * get the slot list of the same olap scan node of the input slot.
- * @param slot slot
- * @return slot list
- */
- public List<NamedExpression> getSlotListOfTheSameSlotAtOlapScanNode(Slot
slot) {
- ImmutableList.Builder<NamedExpression> builder =
ImmutableList.builder();
- NamedExpression expr = slot;
- do {
- builder.add(expr);
- expr = aliasChildToSelf.get(expr.toSlot());
- } while (expr != null);
- return builder.build();
- }
-
public void setTargetNullCount() {
targetNullCount++;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
index cc8531bafc..51c84f2dd6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
@@ -18,17 +18,23 @@
package org.apache.doris.nereids.processor.post;
import org.apache.doris.common.IdGenerator;
+import org.apache.doris.common.Pair;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
+import org.apache.doris.nereids.util.JoinUtils;
import org.apache.doris.planner.RuntimeFilterId;
import org.apache.doris.thrift.TRuntimeFilterType;
@@ -36,7 +42,8 @@ import com.google.common.collect.ImmutableSet;
import java.util.Arrays;
import java.util.List;
-import java.util.Objects;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -44,15 +51,12 @@ import java.util.stream.Collectors;
* generate runtime filter
*/
public class RuntimeFilterGenerator extends PlanPostProcessor {
-
- private final IdGenerator<RuntimeFilterId> generator =
RuntimeFilterId.createGenerator();
-
- private final ImmutableSet<JoinType> deniedJoinType = ImmutableSet.of(
+ private static final ImmutableSet<JoinType> deniedJoinType =
ImmutableSet.of(
JoinType.LEFT_ANTI_JOIN,
- JoinType.RIGHT_ANTI_JOIN,
JoinType.FULL_OUTER_JOIN,
JoinType.LEFT_OUTER_JOIN
);
+ private final IdGenerator<RuntimeFilterId> generator =
RuntimeFilterId.createGenerator();
/**
* the runtime filter generator run at the phase of post process and plan
translation of nereids planner.
@@ -72,94 +76,80 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan,
? extends Plan> join,
CascadesContext context) {
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+ Map<NamedExpression, Pair<RelationId, NamedExpression>>
aliasTransferMap = ctx.getAliasTransferMap();
+ join.right().accept(this, context);
+ join.left().accept(this, context);
if (deniedJoinType.contains(join.getJoinType())) {
- /* TODO: translate left outer join to inner join if there are
inner join ancestors
- * if it has encountered inner join, like
- * a=b
- * / \
- * / \
- * / \
- * / \
- * left join-->a=c b
- * / \
- * / \
- * / \
- * / \
- * a c
- * runtime filter whose src expr is b can take effect on c.
- * but now checking the inner join is unsupported. we may support
it at later version.
- */
- join.getOutput().forEach(slot ->
ctx.removeFilters(slot.getExprId()));
+ // copy to avoid bug when next call of getOutputSet()
+ Set<Slot> slots = join.getOutputSet();
+ slots.forEach(aliasTransferMap::remove);
} else {
- List<TRuntimeFilterType> legalTypes =
Arrays.stream(TRuntimeFilterType.values()).filter(type ->
- (type.getValue() &
ctx.getSessionVariable().getRuntimeFilterType()) > 0)
+ List<TRuntimeFilterType> legalTypes =
Arrays.stream(TRuntimeFilterType.values())
+ .filter(type -> (type.getValue() &
ctx.getSessionVariable().getRuntimeFilterType()) > 0)
.collect(Collectors.toList());
AtomicInteger cnt = new AtomicInteger();
join.getHashJoinConjuncts().stream()
.map(EqualTo.class::cast)
+ // TODO: some complex situation cannot be handled now, see
testPushDownThroughJoin.
// TODO: we will support it in later version.
- /*.peek(expr -> {
- // target is always the expr at the two side of equal
of hash conjunctions.
- // TODO: some complex situation cannot be handled now,
see testPushDownThroughJoin.
- List<SlotReference> slots =
expr.children().stream().filter(SlotReference.class::isInstance)
-
.map(SlotReference.class::cast).collect(Collectors.toList());
- if (slots.size() != 2
- ||
!(ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(0).getExprId())
- ||
ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(1).getExprId()))) {
+ .forEach(expr -> legalTypes.forEach(type -> {
+ Pair<Expression, Expression> normalizedChildren =
checkAndMaybeSwapChild(expr, join);
+ // aliasTransMap doesn't contain the key, means that
the path from the olap scan to the join
+ // contains join with denied join type. for example: a
left join b on a.id = b.id
+ if (normalizedChildren == null
+ || !aliasTransferMap.containsKey((Slot)
normalizedChildren.first)) {
return;
}
- int tag =
ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(0).getExprId()) ? 0
: 1;
- // generate runtime filter to associated expr. for
example, a = b and a = c, RF b -> a can
- // generate RF b -> c
- List<RuntimeFilter> copiedRuntimeFilter =
ctx.getFiltersByTargetExprId(slots.get(tag)
- .getExprId()).stream()
- .map(filter -> new
RuntimeFilter(generator.getNextId(), filter.getSrcExpr(),
- slots.get(tag ^ 1), filter.getType(),
filter.getExprOrder(), join))
- .collect(Collectors.toList());
- ctx.setTargetExprIdToFilters(slots.get(tag ^
1).getExprId(),
- copiedRuntimeFilter.toArray(new
RuntimeFilter[0]));
- })*/
- .forEach(expr -> legalTypes.stream()
- .map(type ->
RuntimeFilter.createRuntimeFilter(generator.getNextId(), expr,
- type, cnt.getAndIncrement(), join))
- .filter(Objects::nonNull)
- .forEach(filter ->
-
ctx.setTargetExprIdToFilters(filter.getTargetExpr().getExprId(), filter)));
+ Pair<Slot, Slot> slots = Pair.of(
+ aliasTransferMap.get((Slot)
normalizedChildren.first).second.toSlot(),
+ ((Slot) normalizedChildren.second));
+ RuntimeFilter filter = new
RuntimeFilter(generator.getNextId(),
+ slots.second, slots.first, type,
+ cnt.getAndIncrement(), join);
+ ctx.setTargetExprIdToFilter(slots.first.getExprId(),
filter);
+ ctx.setTargetsOnScanNode(
+ aliasTransferMap.get((Slot)
normalizedChildren.first).first,
+ slots.first);
+ }));
}
- join.left().accept(this, context);
- join.right().accept(this, context);
return join;
}
// TODO: support src key is agg slot.
@Override
public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan>
project, CascadesContext context) {
- RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+ project.child().accept(this, context);
+ Map<NamedExpression, Pair<RelationId, NamedExpression>>
aliasTransferMap
+ = context.getRuntimeFilterContext().getAliasTransferMap();
+ // change key when encounter alias.
project.getProjects().stream().filter(Alias.class::isInstance)
.map(Alias.class::cast)
- .filter(expr -> expr.child() instanceof SlotReference)
- .forEach(expr ->
ctx.setKVInNormalMap(ctx.getAliasChildToSelf(), ((SlotReference) expr.child()),
expr));
- project.child().accept(this, context);
+ .filter(alias -> alias.child() instanceof NamedExpression
+ && aliasTransferMap.containsKey((NamedExpression)
alias.child()))
+ .forEach(alias -> {
+ NamedExpression child = ((NamedExpression) alias.child());
+ aliasTransferMap.put(alias.toSlot(),
aliasTransferMap.remove(child));
+ });
return project;
}
@Override
public PhysicalOlapScan visitPhysicalOlapScan(PhysicalOlapScan scan,
CascadesContext context) {
+ // add all the slots in map.
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
- scan.getOutput().stream()
- .filter(slot ->
ctx.getSlotListOfTheSameSlotAtOlapScanNode(slot).stream()
- .filter(expr ->
ctx.checkExistKey(ctx.getTargetExprIdToFilter(), expr.getExprId()))
- .peek(expr -> {
- if (expr.getExprId() == slot.getExprId()) {
- return;
- }
- List<RuntimeFilter> filters =
ctx.getFiltersByTargetExprId(expr.getExprId());
- ctx.removeFilters(expr.getExprId());
- filters.forEach(filter ->
filter.setTargetSlot(slot));
-
ctx.setKVInNormalMap(ctx.getTargetExprIdToFilter(), slot.getExprId(), filters);
- })
- .count() > 0)
- .forEach(slot -> ctx.setTargetsOnScanNode(scan.getId(), slot));
+ scan.getOutput().forEach(slot -> ctx.getAliasTransferMap().put(slot,
Pair.of(scan.getId(), slot)));
return scan;
}
+
+ private static Pair<Expression, Expression> checkAndMaybeSwapChild(EqualTo
expr,
+ PhysicalHashJoin<? extends Plan, ? extends Plan> join) {
+ if (expr.child(0).equals(expr.child(1))
+ ||
!expr.children().stream().allMatch(SlotReference.class::isInstance)) {
+ return null;
+ }
+ // current we assume that there are certainly different slot reference
in equal to.
+ // they are not from the same relation.
+ List<Expression> children =
JoinUtils.swapEqualToForChildrenOrder(expr,
join.left().getOutputSet()).children();
+ return Pair.of(children.get(0), children.get(1));
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
index 8eeb6071d7..326bc6b634 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java
@@ -17,12 +17,7 @@
package org.apache.doris.nereids.trees.plans.physical;
-import org.apache.doris.common.Pair;
-import org.apache.doris.nereids.trees.expressions.EqualTo;
-import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
-import org.apache.doris.nereids.trees.expressions.SlotReference;
-import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.planner.RuntimeFilterId;
import org.apache.doris.thrift.TRuntimeFilterType;
@@ -58,37 +53,6 @@ public class RuntimeFilter {
this.builderNode = builderNode;
}
- /**
- * create RF
- */
- public static RuntimeFilter createRuntimeFilter(RuntimeFilterId id,
EqualTo conjunction,
- TRuntimeFilterType type, int exprOrder, PhysicalHashJoin node) {
- Pair<Expression, Expression> srcs =
checkAndMaybeSwapChild(conjunction, node);
- if (srcs == null) {
- return null;
- }
- return new RuntimeFilter(id, ((SlotReference) srcs.second),
((SlotReference) srcs.first), type, exprOrder,
- node);
- }
-
- private static Pair<Expression, Expression> checkAndMaybeSwapChild(EqualTo
expr,
- PhysicalHashJoin join) {
- if (expr.children().stream().anyMatch(Literal.class::isInstance)) {
- return null;
- }
- if (expr.child(0).equals(expr.child(1))) {
- return null;
- }
- if
(!expr.children().stream().allMatch(SlotReference.class::isInstance)) {
- return null;
- }
- // current we assume that there are certainly different slot reference
in equal to.
- // they are not from the same relation.
- int exchangeTag = join.child(0).getOutput().stream().anyMatch(slot ->
slot.getExprId().equals(
- ((SlotReference) expr.child(1)).getExprId())) ? 1 : 0;
- return Pair.of(expr.child(exchangeTag), expr.child(1 ^ exchangeTag));
- }
-
public Slot getSrcExpr() {
return srcSlot;
}
@@ -113,14 +77,6 @@ public class RuntimeFilter {
return builderNode;
}
- public void setTargetSlot(Slot targetSlot) {
- this.targetSlot = targetSlot;
- }
-
- public boolean isUninitialized() {
- return !finalized;
- }
-
public void setFinalized() {
this.finalized = true;
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
index 7b0fe88971..617ccf0f05 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
@@ -228,9 +228,10 @@ public class RuntimeFilterTest extends SSBTestBase {
private void checkRuntimeFilterExprs(List<RuntimeFilter> filters,
List<Pair<String, String>> colNames) {
Assertions.assertEquals(filters.size(), colNames.size());
- for (int i = 0; i < filters.size(); i++) {
-
Assertions.assertTrue(filters.get(i).getSrcExpr().toSql().equals(colNames.get(i).first)
- &&
filters.get(i).getTargetExpr().toSql().equals(colNames.get(i).second));
+ for (RuntimeFilter filter : filters) {
+ Assertions.assertTrue(colNames.contains(Pair.of(
+ filter.getSrcExpr().getName(),
+ filter.getTargetExpr().getName())));
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]