morrySnow commented on code in PR #13949:
URL: https://github.com/apache/doris/pull/13949#discussion_r1015655477
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -49,7 +56,6 @@ public class RuntimeFilterGenerator extends PlanPostProcessor
{
private final ImmutableSet<JoinType> deniedJoinType = ImmutableSet.of(
Review Comment:
static
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -72,94 +78,78 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan,
? extends Plan> join,
CascadesContext context) {
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
- if (deniedJoinType.contains(join.getJoinType())) {
- /* TODO: translate left outer join to inner join if there are
inner join ancestors
- * if it has encountered inner join, like
- * a=b
- * / \
- * / \
- * / \
- * / \
- * left join-->a=c b
- * / \
- * / \
- * / \
- * / \
- * a c
- * runtime filter whose src expr is b can take effect on c.
- * but now checking the inner join is unsupported. we may support
it at later version.
- */
- join.getOutput().forEach(slot ->
ctx.removeFilters(slot.getExprId()));
- } else {
+ Map<NamedExpression, Pair<RelationId, NamedExpression>>
aliasTransferMap = ctx.getAliasTransferMap();
+ join.right().accept(this, context);
+ join.left().accept(this, context);
+ if (!deniedJoinType.contains(join.getJoinType())) {
Review Comment:
nit
```suggestion
if (deniedJoinType.contains(join.getJoinType())) {
...
} else {
...
}
```
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -72,94 +78,78 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan,
? extends Plan> join,
CascadesContext context) {
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
- if (deniedJoinType.contains(join.getJoinType())) {
- /* TODO: translate left outer join to inner join if there are
inner join ancestors
- * if it has encountered inner join, like
- * a=b
- * / \
- * / \
- * / \
- * / \
- * left join-->a=c b
- * / \
- * / \
- * / \
- * / \
- * a c
- * runtime filter whose src expr is b can take effect on c.
- * but now checking the inner join is unsupported. we may support
it at later version.
- */
- join.getOutput().forEach(slot ->
ctx.removeFilters(slot.getExprId()));
- } else {
+ Map<NamedExpression, Pair<RelationId, NamedExpression>>
aliasTransferMap = ctx.getAliasTransferMap();
+ join.right().accept(this, context);
+ join.left().accept(this, context);
+ if (!deniedJoinType.contains(join.getJoinType())) {
List<TRuntimeFilterType> legalTypes =
Arrays.stream(TRuntimeFilterType.values()).filter(type ->
(type.getValue() &
ctx.getSessionVariable().getRuntimeFilterType()) > 0)
.collect(Collectors.toList());
Review Comment:
```suggestion
List<TRuntimeFilterType> legalTypes =
Arrays.stream(TRuntimeFilterType.values())
.filter(type -> (type.getValue() &
ctx.getSessionVariable().getRuntimeFilterType()) > 0)
.collect(Collectors.toList());
```
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -72,94 +78,78 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan,
? extends Plan> join,
CascadesContext context) {
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
- if (deniedJoinType.contains(join.getJoinType())) {
- /* TODO: translate left outer join to inner join if there are
inner join ancestors
- * if it has encountered inner join, like
- * a=b
- * / \
- * / \
- * / \
- * / \
- * left join-->a=c b
- * / \
- * / \
- * / \
- * / \
- * a c
- * runtime filter whose src expr is b can take effect on c.
- * but now checking the inner join is unsupported. we may support
it at later version.
- */
- join.getOutput().forEach(slot ->
ctx.removeFilters(slot.getExprId()));
- } else {
+ Map<NamedExpression, Pair<RelationId, NamedExpression>>
aliasTransferMap = ctx.getAliasTransferMap();
+ join.right().accept(this, context);
+ join.left().accept(this, context);
+ if (!deniedJoinType.contains(join.getJoinType())) {
List<TRuntimeFilterType> legalTypes =
Arrays.stream(TRuntimeFilterType.values()).filter(type ->
(type.getValue() &
ctx.getSessionVariable().getRuntimeFilterType()) > 0)
.collect(Collectors.toList());
AtomicInteger cnt = new AtomicInteger();
join.getHashJoinConjuncts().stream()
.map(EqualTo.class::cast)
+ // TODO: some complex situation cannot be handled now, see
testPushDownThroughJoin.
// TODO: we will support it in later version.
- /*.peek(expr -> {
- // target is always the expr at the two side of equal
of hash conjunctions.
- // TODO: some complex situation cannot be handled now,
see testPushDownThroughJoin.
- List<SlotReference> slots =
expr.children().stream().filter(SlotReference.class::isInstance)
-
.map(SlotReference.class::cast).collect(Collectors.toList());
- if (slots.size() != 2
- ||
!(ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(0).getExprId())
- ||
ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(1).getExprId()))) {
+ .forEach(expr -> legalTypes.forEach(type -> {
+ Pair<Expression, Expression> exprs =
checkAndMaybeSwapChild(expr, join);
+ if (exprs == null ||
!aliasTransferMap.containsKey((Slot) exprs.first)) {
Review Comment:
plz add some comment to explain the if statement
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java:
##########
@@ -86,24 +87,19 @@ public FilterSizeLimits getLimits() {
return limits;
}
- public void setTargetExprIdToFilters(ExprId id, RuntimeFilter... filters) {
- Preconditions.checkArgument(Arrays.stream(filters)
- .allMatch(filter -> filter.getTargetExpr().getExprId() == id));
+ public void setTargetExprIdToFilter(ExprId id, RuntimeFilter filter) {
+ Preconditions.checkArgument(filter.getTargetExpr().getExprId() == id);
this.targetExprIdToFilter.computeIfAbsent(id, k ->
Lists.newArrayList())
- .addAll(Arrays.asList(filters));
- }
-
- public List<RuntimeFilter> getFiltersByTargetExprId(ExprId id) {
- return targetExprIdToFilter.get(id);
+ .add(filter);
}
public void removeFilters(ExprId id) {
targetExprIdToFilter.remove(id);
}
- public void setTargetsOnScanNode(RelationId id, Slot... slots) {
+ public void setTargetsOnScanNode(RelationId id, Slot slot) {
this.targetOnOlapScanNodeMap.computeIfAbsent(id, k ->
Lists.newArrayList())
- .addAll(Arrays.asList(slots));
+ .add(slot);
Review Comment:
ditto
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -72,94 +78,78 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan,
? extends Plan> join,
CascadesContext context) {
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
- if (deniedJoinType.contains(join.getJoinType())) {
- /* TODO: translate left outer join to inner join if there are
inner join ancestors
- * if it has encountered inner join, like
- * a=b
- * / \
- * / \
- * / \
- * / \
- * left join-->a=c b
- * / \
- * / \
- * / \
- * / \
- * a c
- * runtime filter whose src expr is b can take effect on c.
- * but now checking the inner join is unsupported. we may support
it at later version.
- */
- join.getOutput().forEach(slot ->
ctx.removeFilters(slot.getExprId()));
- } else {
+ Map<NamedExpression, Pair<RelationId, NamedExpression>>
aliasTransferMap = ctx.getAliasTransferMap();
+ join.right().accept(this, context);
+ join.left().accept(this, context);
+ if (!deniedJoinType.contains(join.getJoinType())) {
List<TRuntimeFilterType> legalTypes =
Arrays.stream(TRuntimeFilterType.values()).filter(type ->
(type.getValue() &
ctx.getSessionVariable().getRuntimeFilterType()) > 0)
.collect(Collectors.toList());
AtomicInteger cnt = new AtomicInteger();
join.getHashJoinConjuncts().stream()
.map(EqualTo.class::cast)
+ // TODO: some complex situation cannot be handled now, see
testPushDownThroughJoin.
// TODO: we will support it in later version.
- /*.peek(expr -> {
- // target is always the expr at the two side of equal
of hash conjunctions.
- // TODO: some complex situation cannot be handled now,
see testPushDownThroughJoin.
- List<SlotReference> slots =
expr.children().stream().filter(SlotReference.class::isInstance)
-
.map(SlotReference.class::cast).collect(Collectors.toList());
- if (slots.size() != 2
- ||
!(ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(0).getExprId())
- ||
ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(1).getExprId()))) {
+ .forEach(expr -> legalTypes.forEach(type -> {
+ Pair<Expression, Expression> exprs =
checkAndMaybeSwapChild(expr, join);
+ if (exprs == null ||
!aliasTransferMap.containsKey((Slot) exprs.first)) {
return;
}
- int tag =
ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(0).getExprId()) ? 0
: 1;
- // generate runtime filter to associated expr. for
example, a = b and a = c, RF b -> a can
- // generate RF b -> c
- List<RuntimeFilter> copiedRuntimeFilter =
ctx.getFiltersByTargetExprId(slots.get(tag)
- .getExprId()).stream()
- .map(filter -> new
RuntimeFilter(generator.getNextId(), filter.getSrcExpr(),
- slots.get(tag ^ 1), filter.getType(),
filter.getExprOrder(), join))
- .collect(Collectors.toList());
- ctx.setTargetExprIdToFilters(slots.get(tag ^
1).getExprId(),
- copiedRuntimeFilter.toArray(new
RuntimeFilter[0]));
- })*/
- .forEach(expr -> legalTypes.stream()
- .map(type ->
RuntimeFilter.createRuntimeFilter(generator.getNextId(), expr,
- type, cnt.getAndIncrement(), join))
- .filter(Objects::nonNull)
- .forEach(filter ->
-
ctx.setTargetExprIdToFilters(filter.getTargetExpr().getExprId(), filter)));
+ Pair<Slot, Slot> slots = Pair.of(
+ aliasTransferMap.get((Slot)
exprs.first).second.toSlot(),
+ ((Slot) exprs.second));
+ RuntimeFilter filter = new
RuntimeFilter(generator.getNextId(),
+ slots.second, slots.first, type,
+ cnt.getAndIncrement(), join);
+ ctx.setTargetExprIdToFilter(slots.first.getExprId(),
filter);
+ ctx.setTargetsOnScanNode(aliasTransferMap.get(((Slot)
exprs.first)).first, slots.first);
+ }));
+ } else {
+ // copy to avoid bug when next call of getOutputSet()
+ Set<Slot> slots = join.getOutputSet();
+ slots.forEach(aliasTransferMap::remove);
}
- join.left().accept(this, context);
- join.right().accept(this, context);
return join;
}
// TODO: support src key is agg slot.
@Override
public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan>
project, CascadesContext context) {
- RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+ project.child().accept(this, context);
+ Map<NamedExpression, Pair<RelationId, NamedExpression>>
aliasTransferMap
+ = context.getRuntimeFilterContext().getAliasTransferMap();
+ // change key when encounter alias.
project.getProjects().stream().filter(Alias.class::isInstance)
.map(Alias.class::cast)
- .filter(expr -> expr.child() instanceof SlotReference)
- .forEach(expr ->
ctx.setKVInNormalMap(ctx.getAliasChildToSelf(), ((SlotReference) expr.child()),
expr));
- project.child().accept(this, context);
+ .filter(alias -> alias.child() instanceof NamedExpression
+ && aliasTransferMap.containsKey((NamedExpression)
alias.child()))
+ .forEach(alias -> {
+ NamedExpression child = ((NamedExpression) alias.child());
+ aliasTransferMap.put(alias.toSlot(),
aliasTransferMap.remove(child));
+ });
return project;
}
@Override
public PhysicalOlapScan visitPhysicalOlapScan(PhysicalOlapScan scan,
CascadesContext context) {
+ // add all the slots in map.
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
- scan.getOutput().stream()
- .filter(slot ->
ctx.getSlotListOfTheSameSlotAtOlapScanNode(slot).stream()
- .filter(expr ->
ctx.checkExistKey(ctx.getTargetExprIdToFilter(), expr.getExprId()))
- .peek(expr -> {
- if (expr.getExprId() == slot.getExprId()) {
- return;
- }
- List<RuntimeFilter> filters =
ctx.getFiltersByTargetExprId(expr.getExprId());
- ctx.removeFilters(expr.getExprId());
- filters.forEach(filter ->
filter.setTargetSlot(slot));
-
ctx.setKVInNormalMap(ctx.getTargetExprIdToFilter(), slot.getExprId(), filters);
- })
- .count() > 0)
- .forEach(slot -> ctx.setTargetsOnScanNode(scan.getId(), slot));
+ scan.getOutput().forEach(slot ->
+ ctx.setKVInNormalMap(ctx.getAliasTransferMap(), slot,
Pair.of(scan.getId(), slot)));
Review Comment:
setKVInNormalMap function is very weird.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -72,94 +78,78 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan,
? extends Plan> join,
CascadesContext context) {
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
- if (deniedJoinType.contains(join.getJoinType())) {
- /* TODO: translate left outer join to inner join if there are
inner join ancestors
- * if it has encountered inner join, like
- * a=b
- * / \
- * / \
- * / \
- * / \
- * left join-->a=c b
- * / \
- * / \
- * / \
- * / \
- * a c
- * runtime filter whose src expr is b can take effect on c.
- * but now checking the inner join is unsupported. we may support
it at later version.
- */
- join.getOutput().forEach(slot ->
ctx.removeFilters(slot.getExprId()));
- } else {
+ Map<NamedExpression, Pair<RelationId, NamedExpression>>
aliasTransferMap = ctx.getAliasTransferMap();
+ join.right().accept(this, context);
+ join.left().accept(this, context);
+ if (!deniedJoinType.contains(join.getJoinType())) {
List<TRuntimeFilterType> legalTypes =
Arrays.stream(TRuntimeFilterType.values()).filter(type ->
(type.getValue() &
ctx.getSessionVariable().getRuntimeFilterType()) > 0)
.collect(Collectors.toList());
AtomicInteger cnt = new AtomicInteger();
join.getHashJoinConjuncts().stream()
.map(EqualTo.class::cast)
+ // TODO: some complex situation cannot be handled now, see
testPushDownThroughJoin.
// TODO: we will support it in later version.
- /*.peek(expr -> {
- // target is always the expr at the two side of equal
of hash conjunctions.
- // TODO: some complex situation cannot be handled now,
see testPushDownThroughJoin.
- List<SlotReference> slots =
expr.children().stream().filter(SlotReference.class::isInstance)
-
.map(SlotReference.class::cast).collect(Collectors.toList());
- if (slots.size() != 2
- ||
!(ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(0).getExprId())
- ||
ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(1).getExprId()))) {
+ .forEach(expr -> legalTypes.forEach(type -> {
+ Pair<Expression, Expression> exprs =
checkAndMaybeSwapChild(expr, join);
Review Comment:
give exprs a better name
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -72,94 +78,78 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan,
? extends Plan> join,
CascadesContext context) {
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
- if (deniedJoinType.contains(join.getJoinType())) {
- /* TODO: translate left outer join to inner join if there are
inner join ancestors
- * if it has encountered inner join, like
- * a=b
- * / \
- * / \
- * / \
- * / \
- * left join-->a=c b
- * / \
- * / \
- * / \
- * / \
- * a c
- * runtime filter whose src expr is b can take effect on c.
- * but now checking the inner join is unsupported. we may support
it at later version.
- */
- join.getOutput().forEach(slot ->
ctx.removeFilters(slot.getExprId()));
- } else {
+ Map<NamedExpression, Pair<RelationId, NamedExpression>>
aliasTransferMap = ctx.getAliasTransferMap();
+ join.right().accept(this, context);
+ join.left().accept(this, context);
+ if (!deniedJoinType.contains(join.getJoinType())) {
List<TRuntimeFilterType> legalTypes =
Arrays.stream(TRuntimeFilterType.values()).filter(type ->
(type.getValue() &
ctx.getSessionVariable().getRuntimeFilterType()) > 0)
.collect(Collectors.toList());
AtomicInteger cnt = new AtomicInteger();
join.getHashJoinConjuncts().stream()
.map(EqualTo.class::cast)
+ // TODO: some complex situation cannot be handled now, see
testPushDownThroughJoin.
// TODO: we will support it in later version.
- /*.peek(expr -> {
- // target is always the expr at the two side of equal
of hash conjunctions.
- // TODO: some complex situation cannot be handled now,
see testPushDownThroughJoin.
- List<SlotReference> slots =
expr.children().stream().filter(SlotReference.class::isInstance)
-
.map(SlotReference.class::cast).collect(Collectors.toList());
- if (slots.size() != 2
- ||
!(ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(0).getExprId())
- ||
ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(1).getExprId()))) {
+ .forEach(expr -> legalTypes.forEach(type -> {
+ Pair<Expression, Expression> exprs =
checkAndMaybeSwapChild(expr, join);
+ if (exprs == null ||
!aliasTransferMap.containsKey((Slot) exprs.first)) {
return;
}
- int tag =
ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(0).getExprId()) ? 0
: 1;
- // generate runtime filter to associated expr. for
example, a = b and a = c, RF b -> a can
- // generate RF b -> c
- List<RuntimeFilter> copiedRuntimeFilter =
ctx.getFiltersByTargetExprId(slots.get(tag)
- .getExprId()).stream()
- .map(filter -> new
RuntimeFilter(generator.getNextId(), filter.getSrcExpr(),
- slots.get(tag ^ 1), filter.getType(),
filter.getExprOrder(), join))
- .collect(Collectors.toList());
- ctx.setTargetExprIdToFilters(slots.get(tag ^
1).getExprId(),
- copiedRuntimeFilter.toArray(new
RuntimeFilter[0]));
- })*/
- .forEach(expr -> legalTypes.stream()
- .map(type ->
RuntimeFilter.createRuntimeFilter(generator.getNextId(), expr,
- type, cnt.getAndIncrement(), join))
- .filter(Objects::nonNull)
- .forEach(filter ->
-
ctx.setTargetExprIdToFilters(filter.getTargetExpr().getExprId(), filter)));
+ Pair<Slot, Slot> slots = Pair.of(
+ aliasTransferMap.get((Slot)
exprs.first).second.toSlot(),
+ ((Slot) exprs.second));
+ RuntimeFilter filter = new
RuntimeFilter(generator.getNextId(),
+ slots.second, slots.first, type,
+ cnt.getAndIncrement(), join);
+ ctx.setTargetExprIdToFilter(slots.first.getExprId(),
filter);
+ ctx.setTargetsOnScanNode(aliasTransferMap.get(((Slot)
exprs.first)).first, slots.first);
+ }));
+ } else {
+ // copy to avoid bug when next call of getOutputSet()
+ Set<Slot> slots = join.getOutputSet();
+ slots.forEach(aliasTransferMap::remove);
}
- join.left().accept(this, context);
- join.right().accept(this, context);
return join;
}
// TODO: support src key is agg slot.
@Override
public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan>
project, CascadesContext context) {
- RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+ project.child().accept(this, context);
+ Map<NamedExpression, Pair<RelationId, NamedExpression>>
aliasTransferMap
+ = context.getRuntimeFilterContext().getAliasTransferMap();
+ // change key when encounter alias.
project.getProjects().stream().filter(Alias.class::isInstance)
.map(Alias.class::cast)
- .filter(expr -> expr.child() instanceof SlotReference)
- .forEach(expr ->
ctx.setKVInNormalMap(ctx.getAliasChildToSelf(), ((SlotReference) expr.child()),
expr));
- project.child().accept(this, context);
+ .filter(alias -> alias.child() instanceof NamedExpression
+ && aliasTransferMap.containsKey((NamedExpression)
alias.child()))
+ .forEach(alias -> {
+ NamedExpression child = ((NamedExpression) alias.child());
+ aliasTransferMap.put(alias.toSlot(),
aliasTransferMap.remove(child));
+ });
return project;
}
@Override
public PhysicalOlapScan visitPhysicalOlapScan(PhysicalOlapScan scan,
CascadesContext context) {
+ // add all the slots in map.
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
- scan.getOutput().stream()
- .filter(slot ->
ctx.getSlotListOfTheSameSlotAtOlapScanNode(slot).stream()
- .filter(expr ->
ctx.checkExistKey(ctx.getTargetExprIdToFilter(), expr.getExprId()))
- .peek(expr -> {
- if (expr.getExprId() == slot.getExprId()) {
- return;
- }
- List<RuntimeFilter> filters =
ctx.getFiltersByTargetExprId(expr.getExprId());
- ctx.removeFilters(expr.getExprId());
- filters.forEach(filter ->
filter.setTargetSlot(slot));
-
ctx.setKVInNormalMap(ctx.getTargetExprIdToFilter(), slot.getExprId(), filters);
- })
- .count() > 0)
- .forEach(slot -> ctx.setTargetsOnScanNode(scan.getId(), slot));
+ scan.getOutput().forEach(slot ->
+ ctx.setKVInNormalMap(ctx.getAliasTransferMap(), slot,
Pair.of(scan.getId(), slot)));
return scan;
}
+
+ private static Pair<Expression, Expression> checkAndMaybeSwapChild(EqualTo
expr,
+ PhysicalHashJoin join) {
+ if (expr.children().stream().anyMatch(Literal.class::isInstance)
+ || expr.child(0).equals(expr.child(1))
+ ||
!expr.children().stream().allMatch(SlotReference.class::isInstance)) {
Review Comment:
the last condition has covered the first condition
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java:
##########
@@ -72,94 +78,78 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan,
? extends Plan> join,
CascadesContext context) {
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
- if (deniedJoinType.contains(join.getJoinType())) {
- /* TODO: translate left outer join to inner join if there are
inner join ancestors
- * if it has encountered inner join, like
- * a=b
- * / \
- * / \
- * / \
- * / \
- * left join-->a=c b
- * / \
- * / \
- * / \
- * / \
- * a c
- * runtime filter whose src expr is b can take effect on c.
- * but now checking the inner join is unsupported. we may support
it at later version.
- */
- join.getOutput().forEach(slot ->
ctx.removeFilters(slot.getExprId()));
- } else {
+ Map<NamedExpression, Pair<RelationId, NamedExpression>>
aliasTransferMap = ctx.getAliasTransferMap();
+ join.right().accept(this, context);
+ join.left().accept(this, context);
+ if (!deniedJoinType.contains(join.getJoinType())) {
List<TRuntimeFilterType> legalTypes =
Arrays.stream(TRuntimeFilterType.values()).filter(type ->
(type.getValue() &
ctx.getSessionVariable().getRuntimeFilterType()) > 0)
.collect(Collectors.toList());
AtomicInteger cnt = new AtomicInteger();
join.getHashJoinConjuncts().stream()
.map(EqualTo.class::cast)
+ // TODO: some complex situation cannot be handled now, see
testPushDownThroughJoin.
// TODO: we will support it in later version.
- /*.peek(expr -> {
- // target is always the expr at the two side of equal
of hash conjunctions.
- // TODO: some complex situation cannot be handled now,
see testPushDownThroughJoin.
- List<SlotReference> slots =
expr.children().stream().filter(SlotReference.class::isInstance)
-
.map(SlotReference.class::cast).collect(Collectors.toList());
- if (slots.size() != 2
- ||
!(ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(0).getExprId())
- ||
ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(1).getExprId()))) {
+ .forEach(expr -> legalTypes.forEach(type -> {
+ Pair<Expression, Expression> exprs =
checkAndMaybeSwapChild(expr, join);
+ if (exprs == null ||
!aliasTransferMap.containsKey((Slot) exprs.first)) {
return;
}
- int tag =
ctx.checkExistKey(ctx.getTargetExprIdToFilter(), slots.get(0).getExprId()) ? 0
: 1;
- // generate runtime filter to associated expr. for
example, a = b and a = c, RF b -> a can
- // generate RF b -> c
- List<RuntimeFilter> copiedRuntimeFilter =
ctx.getFiltersByTargetExprId(slots.get(tag)
- .getExprId()).stream()
- .map(filter -> new
RuntimeFilter(generator.getNextId(), filter.getSrcExpr(),
- slots.get(tag ^ 1), filter.getType(),
filter.getExprOrder(), join))
- .collect(Collectors.toList());
- ctx.setTargetExprIdToFilters(slots.get(tag ^
1).getExprId(),
- copiedRuntimeFilter.toArray(new
RuntimeFilter[0]));
- })*/
- .forEach(expr -> legalTypes.stream()
- .map(type ->
RuntimeFilter.createRuntimeFilter(generator.getNextId(), expr,
- type, cnt.getAndIncrement(), join))
- .filter(Objects::nonNull)
- .forEach(filter ->
-
ctx.setTargetExprIdToFilters(filter.getTargetExpr().getExprId(), filter)));
+ Pair<Slot, Slot> slots = Pair.of(
+ aliasTransferMap.get((Slot)
exprs.first).second.toSlot(),
+ ((Slot) exprs.second));
+ RuntimeFilter filter = new
RuntimeFilter(generator.getNextId(),
+ slots.second, slots.first, type,
+ cnt.getAndIncrement(), join);
+ ctx.setTargetExprIdToFilter(slots.first.getExprId(),
filter);
+ ctx.setTargetsOnScanNode(aliasTransferMap.get(((Slot)
exprs.first)).first, slots.first);
+ }));
+ } else {
+ // copy to avoid bug when next call of getOutputSet()
+ Set<Slot> slots = join.getOutputSet();
+ slots.forEach(aliasTransferMap::remove);
}
- join.left().accept(this, context);
- join.right().accept(this, context);
return join;
}
// TODO: support src key is agg slot.
@Override
public PhysicalPlan visitPhysicalProject(PhysicalProject<? extends Plan>
project, CascadesContext context) {
- RuntimeFilterContext ctx = context.getRuntimeFilterContext();
+ project.child().accept(this, context);
+ Map<NamedExpression, Pair<RelationId, NamedExpression>>
aliasTransferMap
+ = context.getRuntimeFilterContext().getAliasTransferMap();
+ // change key when encounter alias.
project.getProjects().stream().filter(Alias.class::isInstance)
.map(Alias.class::cast)
- .filter(expr -> expr.child() instanceof SlotReference)
- .forEach(expr ->
ctx.setKVInNormalMap(ctx.getAliasChildToSelf(), ((SlotReference) expr.child()),
expr));
- project.child().accept(this, context);
+ .filter(alias -> alias.child() instanceof NamedExpression
+ && aliasTransferMap.containsKey((NamedExpression)
alias.child()))
+ .forEach(alias -> {
+ NamedExpression child = ((NamedExpression) alias.child());
+ aliasTransferMap.put(alias.toSlot(),
aliasTransferMap.remove(child));
+ });
return project;
}
@Override
public PhysicalOlapScan visitPhysicalOlapScan(PhysicalOlapScan scan,
CascadesContext context) {
+ // add all the slots in map.
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
- scan.getOutput().stream()
- .filter(slot ->
ctx.getSlotListOfTheSameSlotAtOlapScanNode(slot).stream()
- .filter(expr ->
ctx.checkExistKey(ctx.getTargetExprIdToFilter(), expr.getExprId()))
- .peek(expr -> {
- if (expr.getExprId() == slot.getExprId()) {
- return;
- }
- List<RuntimeFilter> filters =
ctx.getFiltersByTargetExprId(expr.getExprId());
- ctx.removeFilters(expr.getExprId());
- filters.forEach(filter ->
filter.setTargetSlot(slot));
-
ctx.setKVInNormalMap(ctx.getTargetExprIdToFilter(), slot.getExprId(), filters);
- })
- .count() > 0)
- .forEach(slot -> ctx.setTargetsOnScanNode(scan.getId(), slot));
+ scan.getOutput().forEach(slot ->
+ ctx.setKVInNormalMap(ctx.getAliasTransferMap(), slot,
Pair.of(scan.getId(), slot)));
return scan;
}
+
+ private static Pair<Expression, Expression> checkAndMaybeSwapChild(EqualTo
expr,
Review Comment:
u could use JoinUtils#swapEqualToForChildrenOrder
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java:
##########
@@ -86,24 +87,19 @@ public FilterSizeLimits getLimits() {
return limits;
}
- public void setTargetExprIdToFilters(ExprId id, RuntimeFilter... filters) {
- Preconditions.checkArgument(Arrays.stream(filters)
- .allMatch(filter -> filter.getTargetExpr().getExprId() == id));
+ public void setTargetExprIdToFilter(ExprId id, RuntimeFilter filter) {
+ Preconditions.checkArgument(filter.getTargetExpr().getExprId() == id);
this.targetExprIdToFilter.computeIfAbsent(id, k ->
Lists.newArrayList())
- .addAll(Arrays.asList(filters));
- }
-
- public List<RuntimeFilter> getFiltersByTargetExprId(ExprId id) {
- return targetExprIdToFilter.get(id);
+ .add(filter);
Review Comment:
not need wrap line
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]