This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4117f0b93b [improve](nereids) Support outer rf into inner left outer
join (#21368)
4117f0b93b is described below
commit 4117f0b93b16145e6a3940ce21e38a73cde8eb45
Author: xzj7019 <[email protected]>
AuthorDate: Fri Jun 30 19:07:39 2023 +0800
[improve](nereids) Support outer rf into inner left outer join (#21368)
Support rf into left outer join from outside allowed type join.
Before this pr, some join type, such as full outer join, are all not
allowed to do rf pushing.
For example, (a left join b on a.id = b.id) inner join c on a.id2 = c.id2,
will lost the rf pushing from c.id2 to inner table a.
This pr will open this limitation for supporting rf into left outer join
from outside allowed type join.
---
.../processor/post/RuntimeFilterGenerator.java | 59 +++++++++++++++++-----
.../nereids/postprocess/RuntimeFilterTest.java | 8 +--
2 files changed, 49 insertions(+), 18 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
index 19561f9ec9..524ba17ba0 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
@@ -35,6 +35,7 @@ import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
+import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
@@ -115,8 +116,22 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
join.right().accept(this, context);
join.left().accept(this, context);
if (DENIED_JOIN_TYPES.contains(join.getJoinType()) ||
join.isMarkJoin()) {
- Set<Slot> slots = join.getOutputSet();
- slots.forEach(aliasTransferMap::remove);
+ // aliasTransMap is also used for judging whether the slot can be
as rf target.
+ // for denied join type, the forbidden slots will be removed from
the map.
+ // for example: a full outer join b on a.id = b.id, all slots will
be removed out.
+ // for left outer join, only remove the right side slots and leave
the left side.
+ // in later visit, the additional checking for the join type will
be invoked for different cases:
+ // case 1: a left join b on a.id = b.id, checking whether rf on
b.id can be pushed to a, the answer is no,
+ // since current join type is left outer join which is in
denied list;
+ // case 2: (a left join b on a.id = b.id) inner join c on a.id2 =
c.id2, checking whether rf on c.id2 can
+ // be pushed to a, the answer is yes, since the current
join is inner join which is permitted.
+ if (join.getJoinType() == JoinType.LEFT_OUTER_JOIN) {
+ Set<Slot> slots = join.right().getOutputSet();
+ slots.forEach(aliasTransferMap::remove);
+ } else {
+ Set<Slot> slots = join.getOutputSet();
+ slots.forEach(aliasTransferMap::remove);
+ }
} else {
collectPushDownCTEInfos(join, context);
if (!getPushDownCTECandidates(ctx).isEmpty()) {
@@ -179,17 +194,18 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
TRuntimeFilterType type = TRuntimeFilterType.BITMAP;
Set<Slot> targetSlots = bitmapContains.child(1).getInputSlots();
for (Slot targetSlot : targetSlots) {
- if (targetSlot != null &&
aliasTransferMap.containsKey(targetSlot)) {
- Slot olapScanSlot =
aliasTransferMap.get(targetSlot).second;
- RuntimeFilter filter = new
RuntimeFilter(generator.getNextId(),
- bitmapContains.child(0),
ImmutableList.of(olapScanSlot),
- ImmutableList.of(bitmapContains.child(1)), type,
i, join, isNot, -1L);
- ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
- ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(),
filter);
-
ctx.setTargetsOnScanNode(aliasTransferMap.get(targetSlot).first.getId(),
- olapScanSlot);
-
join.addBitmapRuntimeFilterCondition(bitmapRuntimeFilterCondition);
+ if (!checkCanPushDownFromJoinType(join, ctx, targetSlot)) {
+ continue;
}
+ Slot olapScanSlot = aliasTransferMap.get(targetSlot).second;
+ RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+ bitmapContains.child(0),
ImmutableList.of(olapScanSlot),
+ ImmutableList.of(bitmapContains.child(1)), type, i,
join, isNot, -1L);
+ ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
+ ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
+
ctx.setTargetsOnScanNode(aliasTransferMap.get(targetSlot).first.getId(),
+ olapScanSlot);
+
join.addBitmapRuntimeFilterCondition(bitmapRuntimeFilterCondition);
}
}
return join;
@@ -275,7 +291,7 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
Slot unwrappedSlot = checkTargetChild(equalTo.left());
// aliasTransMap doesn't contain the key, means that the path from the
olap scan to the join
// contains join with denied join type. for example: a left join b on
a.id = b.id
- if (unwrappedSlot == null ||
!aliasTransferMap.containsKey(unwrappedSlot)) {
+ if (!checkCanPushDownFromJoinType(join, ctx, unwrappedSlot)) {
return;
}
Slot olapScanSlot = aliasTransferMap.get(unwrappedSlot).second;
@@ -342,6 +358,9 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
origSlot = (SlotReference) targetExpr;
}
Slot olapScanSlot = aliasTransferMap.get(origSlot).second;
+ if (!checkCanPushDownFromJoinType(join, ctx, olapScanSlot)) {
+ continue;
+ }
PhysicalRelation scan = aliasTransferMap.get(origSlot).first;
if (type == TRuntimeFilterType.IN_OR_BLOOM
&& ctx.getSessionVariable().enablePipelineEngine()
@@ -563,7 +582,7 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
Slot unwrappedSlot = checkTargetChild(equalTo.left());
// aliasTransMap doesn't contain the key, means that the path from the
olap scan to the join
// contains join with denied join type. for example: a left join b on
a.id = b.id
- if (unwrappedSlot == null ||
!aliasTransferMap.containsKey(unwrappedSlot)) {
+ if (!checkCanPushDownFromJoinType(join, ctx, unwrappedSlot)) {
return;
}
Slot cteSlot = aliasTransferMap.get(unwrappedSlot).second;
@@ -608,6 +627,18 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
}
}
+ private boolean checkCanPushDownFromJoinType(AbstractPhysicalJoin
physicalJoin,
+ RuntimeFilterContext ctx, Slot slot) {
+ Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap =
ctx.getAliasTransferMap();
+ if (slot == null || !aliasTransferMap.containsKey(slot)) {
+ return false;
+ } else if (DENIED_JOIN_TYPES.contains(physicalJoin.getJoinType()) ||
physicalJoin.isMarkJoin()) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+
private boolean checkCanPushDownIntoBasicTable(PhysicalPlan root) {
// only support spj currently
List<PhysicalPlan> plans = Lists.newArrayList();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
index 699df19ba4..fc08b0c35d 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
@@ -101,9 +101,9 @@ public class RuntimeFilterTest extends SSBTestBase {
+ " inner join (select c_custkey from customer inner join
supplier on c_custkey = s_suppkey) b"
+ " on b.c_custkey = a.lo_custkey";
List<RuntimeFilter> filters = getRuntimeFilters(sql).get();
- Assertions.assertEquals(1, filters.size());
+ Assertions.assertEquals(2, filters.size());
checkRuntimeFilterExprs(filters, ImmutableList.of(
- Pair.of("s_suppkey", "c_custkey")));
+ Pair.of("s_suppkey", "c_custkey"), Pair.of("c_custkey",
"lo_custkey")));
}
@Test
@@ -207,10 +207,10 @@ public class RuntimeFilterTest extends SSBTestBase {
+ " on b.c_custkey = a.lo_custkey) c inner join (select
lo_custkey from customer inner join lineorder"
+ " on c_custkey = lo_custkey) d on c.c_custkey =
d.lo_custkey";
List<RuntimeFilter> filters = getRuntimeFilters(sql).get();
- Assertions.assertEquals(3, filters.size());
+ Assertions.assertEquals(4, filters.size());
checkRuntimeFilterExprs(filters, ImmutableList.of(
Pair.of("c_custkey", "lo_custkey"), Pair.of("d_datekey",
"lo_orderdate"),
- Pair.of("lo_custkey", "c_custkey")));
+ Pair.of("lo_custkey", "c_custkey"), Pair.of("lo_custkey",
"c_custkey")));
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]