This is an automated email from the ASF dual-hosted git repository.
englefly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f4f950b9bd9 [fix](nereids)push more than one runtime filters into cte
(#30901)
f4f950b9bd9 is described below
commit f4f950b9bd934771074001c23feae67a9793d120
Author: minghong <[email protected]>
AuthorDate: Wed Feb 21 09:55:30 2024 +0800
[fix](nereids)push more than one runtime filters into cte (#30901)
* push rf into cte, used by tpcds95
---
.../glue/translator/PhysicalPlanTranslator.java | 6 +--
.../glue/translator/RuntimeFilterTranslator.java | 7 ---
.../processor/post/RuntimeFilterContext.java | 41 ++++++++--------
.../processor/post/RuntimeFilterGenerator.java | 57 ++++++++++++----------
.../nereids/postprocess/RuntimeFilterTest.java | 2 +-
.../noStatsRfPrune/query95.out | 2 +-
.../no_stats_shape/query95.out | 2 +-
7 files changed, 58 insertions(+), 59 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index e1571778a15..c9cb534c9c2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -1284,8 +1284,7 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
.forEach(s -> rightChildOutputMap.put(s.getExprId(), s));
// translate runtime filter
- context.getRuntimeTranslator().ifPresent(runtimeFilterTranslator ->
runtimeFilterTranslator
- .getRuntimeFilterOfHashJoinNode(physicalHashJoin)
+ context.getRuntimeTranslator().ifPresent(runtimeFilterTranslator ->
physicalHashJoin.getRuntimeFilters()
.forEach(filter ->
runtimeFilterTranslator.createLegacyRuntimeFilter(filter, hashJoinNode,
context)));
// make intermediate tuple
@@ -1484,8 +1483,7 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
}
// translate runtime filter
context.getRuntimeTranslator().ifPresent(runtimeFilterTranslator
-> {
- Set<RuntimeFilter> filters = runtimeFilterTranslator
- .getRuntimeFilterOfHashJoinNode(nestedLoopJoin);
+ List<RuntimeFilter> filters =
nestedLoopJoin.getRuntimeFilters();
filters.forEach(filter -> runtimeFilterTranslator
.createLegacyRuntimeFilter(filter, nestedLoopJoinNode,
context));
if (filters.stream().anyMatch(filter -> filter.getType() ==
TRuntimeFilterType.BITMAP)) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
index a2fbc28ecd8..787a2bd8181 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
@@ -28,7 +28,6 @@ import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
-import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
import org.apache.doris.planner.CTEScanNode;
import org.apache.doris.planner.DataStreamSink;
@@ -47,7 +46,6 @@ import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
/**
* translate runtime filter
@@ -58,11 +56,6 @@ public class RuntimeFilterTranslator {
public RuntimeFilterTranslator(RuntimeFilterContext context) {
this.context = context;
- context.generatePhysicalHashJoinToRuntimeFilter();
- }
-
- public Set<RuntimeFilter>
getRuntimeFilterOfHashJoinNode(AbstractPhysicalJoin join) {
- return context.getRuntimeFilterOnHashJoinNode(join);
}
public RuntimeFilterContext getContext() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
index eb5767fc1f6..bf137e3c580 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterContext.java
@@ -110,8 +110,6 @@ public class RuntimeFilterContext {
// exprId to olap scan node slotRef because the slotRef will be changed
when translating.
private final Map<ExprId, SlotRef> exprIdToOlapScanNodeSlotRef =
Maps.newHashMap();
- private final Map<AbstractPhysicalJoin, Set<RuntimeFilter>>
runtimeFilterOnHashJoinNode = Maps.newHashMap();
-
// alias -> alias's child, if there's a key that is alias's child, the
key-value will change by this way
// Alias(A) = B, now B -> A in map, and encounter Alias(B) -> C, the kv
will be C -> A.
// you can see disjoint set data structure to learn the processing
detailed.
@@ -191,19 +189,31 @@ public class RuntimeFilterContext {
public void removeFilter(ExprId targetId, PhysicalHashJoin builderNode) {
List<RuntimeFilter> filters = targetExprIdToFilter.get(targetId);
if (filters != null) {
- Iterator<RuntimeFilter> iter = filters.iterator();
- while (iter.hasNext()) {
- RuntimeFilter rf = iter.next();
+ Iterator<RuntimeFilter> filterIter = filters.iterator();
+ while (filterIter.hasNext()) {
+ RuntimeFilter rf = filterIter.next();
if (rf.getBuilderNode().equals(builderNode)) {
- builderNode.getRuntimeFilters().remove(rf);
- for (int i = 0; i < rf.getTargetSlots().size(); i++) {
- Slot targetSlot = rf.getTargetSlots().get(i);
+ Iterator<Slot> targetSlotIter =
rf.getTargetSlots().listIterator();
+ Iterator<PhysicalRelation> targetScanIter =
rf.getTargetScans().iterator();
+ Iterator<Expression> targetExpressionIter =
rf.getTargetExpressions().iterator();
+ Slot targetSlot;
+ PhysicalRelation targetScan;
+ while (targetScanIter.hasNext() &&
targetSlotIter.hasNext() && targetExpressionIter.hasNext()) {
+ targetExpressionIter.next();
+ targetScan = targetScanIter.next();
+ targetSlot = targetSlotIter.next();
if (targetSlot.getExprId().equals(targetId)) {
-
rf.getTargetScans().get(i).removeAppliedRuntimeFilter(rf);
+ targetScan.removeAppliedRuntimeFilter(rf);
+ targetExpressionIter.remove();
+ targetScanIter.remove();
+ targetSlotIter.remove();
}
}
- iter.remove();
- prunedRF.add(rf);
+ if (rf.getTargetSlots().isEmpty()) {
+ builderNode.getRuntimeFilters().remove(rf);
+ filterIter.remove();
+ prunedRF.add(rf);
+ }
}
}
}
@@ -255,15 +265,6 @@ public class RuntimeFilterContext {
return scanNodeOfLegacyRuntimeFilterTarget;
}
- public Set<RuntimeFilter>
getRuntimeFilterOnHashJoinNode(AbstractPhysicalJoin join) {
- return runtimeFilterOnHashJoinNode.getOrDefault(join,
Collections.emptySet());
- }
-
- public void generatePhysicalHashJoinToRuntimeFilter() {
- targetExprIdToFilter.values().forEach(filters ->
filters.forEach(filter -> runtimeFilterOnHashJoinNode
- .computeIfAbsent(filter.getBuilderNode(), k ->
Sets.newHashSet()).add(filter)));
- }
-
public Map<ExprId, List<RuntimeFilter>> getTargetExprIdToFilter() {
return targetExprIdToFilter;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
index d4c5a96d3c5..5a7fbecb6ec 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
@@ -158,7 +158,7 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
// the most right deep buildNode from
rfsToPushDown is used as buildNode for pushDown rf
// since the srcExpr are the same, all buildNodes
of rfToPushDown are in the same tree path
// the longest ancestors means its corresponding
rf build node is the most right deep one.
- RuntimeFilter rightDeep = rfsToPushDown.get(0);
+ List<RuntimeFilter> rightDeepRfs =
Lists.newArrayList();
List<Plan> rightDeepAncestors =
rfsToPushDown.get(0).getBuilderNode().getAncestors();
int rightDeepAncestorsSize =
rightDeepAncestors.size();
RuntimeFilter leftTop = rfsToPushDown.get(0);
@@ -166,10 +166,15 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
for (RuntimeFilter rf : rfsToPushDown) {
List<Plan> ancestors =
rf.getBuilderNode().getAncestors();
int currentAncestorsSize = ancestors.size();
- if (currentAncestorsSize >
rightDeepAncestorsSize) {
- rightDeep = rf;
- rightDeepAncestorsSize =
currentAncestorsSize;
- rightDeepAncestors = ancestors;
+ if (currentAncestorsSize >=
rightDeepAncestorsSize) {
+ if (currentAncestorsSize ==
rightDeepAncestorsSize) {
+ rightDeepRfs.add(rf);
+ } else {
+ rightDeepAncestorsSize =
currentAncestorsSize;
+ rightDeepAncestors = ancestors;
+ rightDeepRfs.clear();
+ rightDeepRfs.add(rf);
+ }
}
if (currentAncestorsSize <
leftTopAncestorsSize) {
leftTopAncestorsSize =
currentAncestorsSize;
@@ -187,7 +192,7 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
if (cursor instanceof AbstractPhysicalJoin) {
AbstractPhysicalJoin cursorJoin =
(AbstractPhysicalJoin) cursor;
valid =
(!RuntimeFilterGenerator.DENIED_JOIN_TYPES
-
.contains(cursorJoin.getJoinType())
+ .contains(cursorJoin.getJoinType())
|| cursorJoin.isMarkJoin()) &&
valid;
}
if (!valid) {
@@ -199,27 +204,29 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
break;
}
- Expression rightDeepTargetExpressionOnCTE = null;
- int targetCount =
rightDeep.getTargetExpressions().size();
- for (int i = 0; i < targetCount; i++) {
- PhysicalRelation rel =
rightDeep.getTargetScans().get(i);
- if (rel instanceof PhysicalCTEConsumer
- && ((PhysicalCTEConsumer)
rel).getCteId().equals(cteId)) {
- rightDeepTargetExpressionOnCTE =
rightDeep.getTargetExpressions().get(i);
- break;
+ for (RuntimeFilter rfToPush : rightDeepRfs) {
+ Expression rightDeepTargetExpressionOnCTE =
null;
+ int targetCount =
rfToPush.getTargetExpressions().size();
+ for (int i = 0; i < targetCount; i++) {
+ PhysicalRelation rel =
rfToPush.getTargetScans().get(i);
+ if (rel instanceof PhysicalCTEConsumer
+ && ((PhysicalCTEConsumer)
rel).getCteId().equals(cteId)) {
+ rightDeepTargetExpressionOnCTE =
rfToPush.getTargetExpressions().get(i);
+ break;
+ }
}
- }
- boolean pushedDown =
doPushDownIntoCTEProducerInternal(
- rightDeep,
- rightDeepTargetExpressionOnCTE,
- rfCtx,
- rfCtx.getCteProduceMap().get(cteId)
- );
- if (pushedDown) {
- rfCtx.removeFilter(
-
rightDeepTargetExpressionOnCTE.getInputSlotExprIds().iterator().next(),
- (PhysicalHashJoin)
rightDeep.getBuilderNode());
+ boolean pushedDown =
doPushDownIntoCTEProducerInternal(
+ rfToPush,
+ rightDeepTargetExpressionOnCTE,
+ rfCtx,
+ rfCtx.getCteProduceMap().get(cteId)
+ );
+ if (pushedDown) {
+ rfCtx.removeFilter(
+
rightDeepTargetExpressionOnCTE.getInputSlotExprIds().iterator().next(),
+ (PhysicalHashJoin)
rfToPush.getBuilderNode());
+ }
}
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
index ce647427d21..f354ed5b02f 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/postprocess/RuntimeFilterTest.java
@@ -295,7 +295,7 @@ public class RuntimeFilterTest extends SSBTestBase {
.rewrite()
.implement();
PhysicalPlan plan = checker.getPhysicalPlan();
- new PlanPostProcessors(checker.getCascadesContext()).process(plan);
+ plan = new
PlanPostProcessors(checker.getCascadesContext()).process(plan);
System.out.println(plan.treeString());
new PhysicalPlanTranslator(new
PlanTranslatorContext(checker.getCascadesContext())).translatePlan(plan);
RuntimeFilterContext context =
checker.getCascadesContext().getRuntimeFilterContext();
diff --git
a/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query95.out
b/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query95.out
index 6e12e33b54d..82a42a4795c 100644
---
a/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query95.out
+++
b/regression-test/data/nereids_tpcds_shape_sf100_p0/noStatsRfPrune/query95.out
@@ -26,7 +26,7 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
----------------------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_web_site_sk
= web_site.web_site_sk)) otherCondition=() build RFs:RF5
web_site_sk->[ws_web_site_sk]
------------------------hashJoin[INNER_JOIN]
hashCondition=((ws1.ws_ship_addr_sk = customer_address.ca_address_sk))
otherCondition=() build RFs:RF4 ca_address_sk->[ws_ship_addr_sk]
--------------------------hashJoin[INNER_JOIN]
hashCondition=((ws1.ws_ship_date_sk = date_dim.d_date_sk)) otherCondition=()
build RFs:RF3 d_date_sk->[ws_ship_date_sk]
-----------------------------hashJoin[RIGHT_SEMI_JOIN]
hashCondition=((ws1.ws_order_number = web_returns.wr_order_number))
otherCondition=() build RFs:RF7
ws_order_number->[ws_order_number,ws_order_number]
+----------------------------hashJoin[RIGHT_SEMI_JOIN]
hashCondition=((ws1.ws_order_number = web_returns.wr_order_number))
otherCondition=() build RFs:RF2 ws_order_number->[wr_order_number];RF7
ws_order_number->[ws_order_number,ws_order_number]
------------------------------PhysicalProject
--------------------------------hashJoin[INNER_JOIN]
hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number))
otherCondition=()
----------------------------------PhysicalDistribute[DistributionSpecHash]
diff --git
a/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query95.out
b/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query95.out
index 262c9046490..bf426f81483 100644
---
a/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query95.out
+++
b/regression-test/data/nereids_tpcds_shape_sf100_p0/no_stats_shape/query95.out
@@ -26,7 +26,7 @@ PhysicalCteAnchor ( cteId=CTEId#0 )
----------------------hashJoin[INNER_JOIN] hashCondition=((ws1.ws_web_site_sk
= web_site.web_site_sk)) otherCondition=() build RFs:RF5
web_site_sk->[ws_web_site_sk]
------------------------hashJoin[INNER_JOIN]
hashCondition=((ws1.ws_ship_addr_sk = customer_address.ca_address_sk))
otherCondition=() build RFs:RF4 ca_address_sk->[ws_ship_addr_sk]
--------------------------hashJoin[INNER_JOIN]
hashCondition=((ws1.ws_ship_date_sk = date_dim.d_date_sk)) otherCondition=()
build RFs:RF3 d_date_sk->[ws_ship_date_sk]
-----------------------------hashJoin[RIGHT_SEMI_JOIN]
hashCondition=((ws1.ws_order_number = web_returns.wr_order_number))
otherCondition=() build RFs:RF7
ws_order_number->[ws_order_number,ws_order_number]
+----------------------------hashJoin[RIGHT_SEMI_JOIN]
hashCondition=((ws1.ws_order_number = web_returns.wr_order_number))
otherCondition=() build RFs:RF2 ws_order_number->[wr_order_number];RF7
ws_order_number->[ws_order_number,ws_order_number]
------------------------------PhysicalProject
--------------------------------hashJoin[INNER_JOIN]
hashCondition=((web_returns.wr_order_number = ws_wh.ws_order_number))
otherCondition=() build RFs:RF1 wr_order_number->[ws_order_number]
----------------------------------PhysicalDistribute[DistributionSpecHash]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]