This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 5ce58ed7736 [fix](nereids) runtime filter push-down-cte column mapping
bug #34875
5ce58ed7736 is described below
commit 5ce58ed7736ed0017b70da7ecacc9b4565a13f16
Author: minghong <[email protected]>
AuthorDate: Wed May 15 10:31:39 2024 +0800
[fix](nereids) runtime filter push-down-cte column mapping bug #34875
---
.../processor/post/RuntimeFilterGenerator.java | 89 +++++++++++-----------
1 file changed, 46 insertions(+), 43 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
index 5722fa527db..6ace6297aab 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java
@@ -533,54 +533,57 @@ public class RuntimeFilterGenerator extends
PlanPostProcessor {
if (!checkProbeSlot(ctx, unwrappedSlot)) {
return false;
}
- Slot cteSlot = ctx.getAliasTransferPair(unwrappedSlot).second;
+ Slot consumerOutputSlot =
ctx.getAliasTransferPair(unwrappedSlot).second;
PhysicalRelation cteNode =
ctx.getAliasTransferPair(unwrappedSlot).first;
long buildSideNdv = rf.getBuildSideNdv();
- if (cteNode instanceof PhysicalCTEConsumer && inputPlanNode instanceof
PhysicalProject) {
- PhysicalProject<Plan> project = (PhysicalProject<Plan>)
inputPlanNode;
- NamedExpression targetExpr = null;
- for (NamedExpression ne : project.getProjects()) {
- if (cteSlot.getName().equals(ne.getName())) {
- targetExpr = ne;
- break;
- }
+ if (!(cteNode instanceof PhysicalCTEConsumer) || !(inputPlanNode
instanceof PhysicalProject)) {
+ return false;
+ }
+ Slot cteSlot = ((PhysicalCTEConsumer)
cteNode).getProducerSlot(consumerOutputSlot);
+
+ PhysicalProject<Plan> project = (PhysicalProject<Plan>) inputPlanNode;
+ NamedExpression targetExpr = null;
+ for (NamedExpression ne : project.getProjects()) {
+ if (cteSlot.getExprId().equals(ne.getExprId())) {
+ targetExpr = ne;
+ break;
}
- Preconditions.checkState(targetExpr != null,
- "cannot find runtime filter cte.target: "
- + cteSlot + "in project " + project.toString());
- if (targetExpr instanceof SlotReference &&
checkCanPushDownIntoBasicTable(project)) {
- Map<Slot, PhysicalRelation> pushDownBasicTableInfos =
getPushDownBasicTablesInfos(project,
- (SlotReference) targetExpr, ctx);
- if (!pushDownBasicTableInfos.isEmpty()) {
- List<Slot> targetList = new ArrayList<>();
- List<Expression> targetExpressions = new ArrayList<>();
- List<PhysicalRelation> targetNodes = new ArrayList<>();
- for (Map.Entry<Slot, PhysicalRelation> entry :
pushDownBasicTableInfos.entrySet()) {
- Slot targetSlot = entry.getKey();
- PhysicalRelation scan = entry.getValue();
- if
(!RuntimeFilterGenerator.checkPushDownPreconditionsForRelation(project, scan)) {
- continue;
- }
- targetList.add(targetSlot);
- targetExpressions.add(targetSlot);
- targetNodes.add(scan);
- ctx.addJoinToTargetMap(rf.getBuilderNode(),
targetSlot.getExprId());
- ctx.setTargetsOnScanNode(scan, targetSlot);
- }
- if (targetList.isEmpty()) {
- return false;
- }
- RuntimeFilter filter = new
RuntimeFilter(generator.getNextId(),
- rf.getSrcExpr(), targetList, targetExpressions,
rf.getType(), rf.getExprOrder(),
- rf.getBuilderNode(), buildSideNdv,
rf.isBloomFilterSizeCalculatedByNdv(),
- rf.gettMinMaxType(), cteNode);
- targetNodes.forEach(node ->
node.addAppliedRuntimeFilter(filter));
- for (Slot slot : targetList) {
- ctx.setTargetExprIdToFilter(slot.getExprId(), filter);
+ }
+ Preconditions.checkState(targetExpr != null,
+ "cannot find runtime filter cte.target: "
+ + cteSlot + "in project " + project.toString());
+ if (targetExpr instanceof SlotReference &&
checkCanPushDownIntoBasicTable(project)) {
+ Map<Slot, PhysicalRelation> pushDownBasicTableInfos =
getPushDownBasicTablesInfos(project,
+ (SlotReference) targetExpr, ctx);
+ if (!pushDownBasicTableInfos.isEmpty()) {
+ List<Slot> targetList = new ArrayList<>();
+ List<Expression> targetExpressions = new ArrayList<>();
+ List<PhysicalRelation> targetNodes = new ArrayList<>();
+ for (Map.Entry<Slot, PhysicalRelation> entry :
pushDownBasicTableInfos.entrySet()) {
+ Slot targetSlot = entry.getKey();
+ PhysicalRelation scan = entry.getValue();
+ if
(!RuntimeFilterGenerator.checkPushDownPreconditionsForRelation(project, scan)) {
+ continue;
}
- ctx.setRuntimeFilterIdentityToFilter(rf.getSrcExpr(),
rf.getType(), rf.getBuilderNode(), filter);
- return true;
+ targetList.add(targetSlot);
+ targetExpressions.add(targetSlot);
+ targetNodes.add(scan);
+ ctx.addJoinToTargetMap(rf.getBuilderNode(),
targetSlot.getExprId());
+ ctx.setTargetsOnScanNode(scan, targetSlot);
+ }
+ if (targetList.isEmpty()) {
+ return false;
+ }
+ RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
+ rf.getSrcExpr(), targetList, targetExpressions,
rf.getType(), rf.getExprOrder(),
+ rf.getBuilderNode(), buildSideNdv,
rf.isBloomFilterSizeCalculatedByNdv(),
+ rf.gettMinMaxType(), cteNode);
+ targetNodes.forEach(node ->
node.addAppliedRuntimeFilter(filter));
+ for (Slot slot : targetList) {
+ ctx.setTargetExprIdToFilter(slot.getExprId(), filter);
}
+ ctx.setRuntimeFilterIdentityToFilter(rf.getSrcExpr(),
rf.getType(), rf.getBuilderNode(), filter);
+ return true;
}
}
return false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]