This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7f9b130c1d3 [fix](nereids) runtime filter generation bug for topn lazy
materialization (#56137)
7f9b130c1d3 is described below
commit 7f9b130c1d3b66a0185dc4fd8db83011a5304afc
Author: minghong <[email protected]>
AuthorDate: Fri Sep 19 14:28:36 2025 +0800
[fix](nereids) runtime filter generation bug for topn lazy materialization
(#56137)
### What problem does this PR solve?
Previous pr (topn lazy materialization, #51329 commit id a4b50087)
introduces a bug in runtime filter target translation.
the runtime filter target should be basased on
PhysicalLazyMaterializeOlapScan, not the inner PhysicalOlapScan.
---
.../glue/translator/PhysicalPlanTranslator.java | 138 +++++----------------
.../suites/query_p0/topn_lazy/topn_lazy.groovy | 9 +-
2 files changed, 36 insertions(+), 111 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 06b0ad10674..bafbacfb032 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -668,14 +668,6 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
fileScan.getTableSnapshot().ifPresent(fileQueryScanNode::setQueryTableSnapshot);
fileScan.getScanParams().ifPresent(fileQueryScanNode::setScanParams);
}
- // translate rf v2 target
- List<RuntimeFilterV2> rfV2s = context.getRuntimeFilterV2Context()
- .getRuntimeFilterV2ByTargetPlan(fileScan);
- for (RuntimeFilterV2 rfV2 : rfV2s) {
- Expr targetExpr =
rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context);
- rfV2.setLegacyTargetNode(scanNode);
- rfV2.setLegacyTargetExpr(targetExpr);
- }
return getPlanFragmentForPhysicalFileScan(fileScan, context, scanNode,
table, tupleDescriptor);
}
@@ -777,21 +769,7 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
}
Utils.execWithUncheckedException(scanNode::init);
context.addScanNode(scanNode, fileScan);
- ScanNode finalScanNode = scanNode;
- context.getRuntimeTranslator().ifPresent(
- runtimeFilterGenerator ->
runtimeFilterGenerator.getContext().getTargetListByScan(fileScan).forEach(
- expr ->
runtimeFilterGenerator.translateRuntimeFilterTarget(expr, finalScanNode,
context)
- )
- );
- // translate rf v2 target
- List<RuntimeFilterV2> rfV2s = context.getRuntimeFilterV2Context()
- .getRuntimeFilterV2ByTargetPlan(fileScan);
- for (RuntimeFilterV2 rfV2 : rfV2s) {
- Expr targetExpr =
rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context);
- rfV2.setLegacyTargetNode(scanNode);
- rfV2.setLegacyTargetExpr(targetExpr);
- }
- context.getTopnFilterContext().translateTarget(fileScan, scanNode,
context);
+ translateRuntimeFilter(fileScan, scanNode, context);
// Create PlanFragment
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = createPlanFragment(scanNode,
dataPartition, fileScan);
@@ -811,21 +789,7 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
context.getNereidsIdToPlanNodeIdMap().put(jdbcScan.getId(),
jdbcScanNode.getId());
Utils.execWithUncheckedException(jdbcScanNode::init);
context.addScanNode(jdbcScanNode, jdbcScan);
- context.getRuntimeTranslator().ifPresent(
- runtimeFilterGenerator ->
runtimeFilterGenerator.getContext().getTargetListByScan(jdbcScan).forEach(
- expr ->
runtimeFilterGenerator.translateRuntimeFilterTarget(expr, jdbcScanNode, context)
- )
- );
- // translate rf v2 target
- List<RuntimeFilterV2> rfV2s = context.getRuntimeFilterV2Context()
- .getRuntimeFilterV2ByTargetPlan(jdbcScan);
- for (RuntimeFilterV2 rfV2 : rfV2s) {
- Expr targetExpr =
rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context);
- rfV2.setLegacyTargetNode(jdbcScanNode);
- rfV2.setLegacyTargetExpr(targetExpr);
- }
-
- context.getTopnFilterContext().translateTarget(jdbcScan, jdbcScanNode,
context);
+ translateRuntimeFilter(jdbcScan, jdbcScanNode, context);
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(),
jdbcScanNode, dataPartition);
context.addPlanFragment(planFragment);
@@ -844,21 +808,7 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
context.getNereidsIdToPlanNodeIdMap().put(odbcScan.getId(),
odbcScanNode.getId());
Utils.execWithUncheckedException(odbcScanNode::init);
context.addScanNode(odbcScanNode, odbcScan);
- context.getRuntimeTranslator().ifPresent(
- runtimeFilterGenerator ->
runtimeFilterGenerator.getContext().getTargetListByScan(odbcScan).forEach(
- expr ->
runtimeFilterGenerator.translateRuntimeFilterTarget(expr, odbcScanNode, context)
- )
- );
- // translate rf v2 target
- List<RuntimeFilterV2> rfV2s = context.getRuntimeFilterV2Context()
- .getRuntimeFilterV2ByTargetPlan(odbcScan);
- for (RuntimeFilterV2 rfV2 : rfV2s) {
- Expr targetExpr =
rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context);
- rfV2.setLegacyTargetNode(odbcScanNode);
- rfV2.setLegacyTargetExpr(targetExpr);
- }
- context.getTopnFilterContext().translateTarget(odbcScan, odbcScanNode,
context);
- context.getTopnFilterContext().translateTarget(odbcScan, odbcScanNode,
context);
+ translateRuntimeFilter(odbcScan, odbcScanNode, context);
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(),
odbcScanNode, dataPartition);
context.addPlanFragment(planFragment);
@@ -980,25 +930,10 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
// create scan range
Utils.execWithUncheckedException(olapScanNode::init);
- // TODO: process collect scan node in one place
context.addScanNode(olapScanNode, olapScan);
- // TODO: process translate runtime filter in one place
- // use real plan node to present rf apply and rf generator
- context.getRuntimeTranslator().ifPresent(
- runtimeFilterTranslator ->
runtimeFilterTranslator.getContext().getTargetListByScan(olapScan)
- .forEach(expr ->
runtimeFilterTranslator.translateRuntimeFilterTarget(
- expr, olapScanNode, context)
- )
- );
- // translate rf v2 target
- List<RuntimeFilterV2> rfV2s = context.getRuntimeFilterV2Context()
- .getRuntimeFilterV2ByTargetPlan(olapScan);
- for (RuntimeFilterV2 rfV2 : rfV2s) {
- Expr targetExpr =
rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context);
- rfV2.setLegacyTargetNode(olapScanNode);
- rfV2.setLegacyTargetExpr(targetExpr);
- }
- context.getTopnFilterContext().translateTarget(olapScan, olapScanNode,
context);
+
+ translateRuntimeFilter(olapScan, olapScanNode, context);
+
olapScanNode.setPushDownAggNoGrouping(context.getRelationPushAggOp(olapScan.getRelationId()));
// Create PlanFragment
// TODO: use a util function to convert distribution to DataPartition
@@ -1019,6 +954,26 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
return planFragment;
}
+ private void translateRuntimeFilter(PhysicalRelation physicalRelation,
ScanNode scanNode,
+ PlanTranslatorContext context) {
+ if (context.getRuntimeTranslator().isPresent()) {
+ RuntimeFilterTranslator runtimeFilterTranslator =
context.getRuntimeTranslator().get();
+ for (Slot slot :
runtimeFilterTranslator.getContext().getTargetListByScan(physicalRelation)) {
+ runtimeFilterTranslator.translateRuntimeFilterTarget(slot,
scanNode, context);
+ }
+ }
+
+ // translate rf v2 target
+ List<RuntimeFilterV2> rfV2s = context.getRuntimeFilterV2Context()
+ .getRuntimeFilterV2ByTargetPlan(physicalRelation);
+ for (RuntimeFilterV2 rfV2 : rfV2s) {
+ Expr targetExpr =
rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context);
+ rfV2.setLegacyTargetNode(scanNode);
+ rfV2.setLegacyTargetExpr(targetExpr);
+ }
+ context.getTopnFilterContext().translateTarget(physicalRelation,
scanNode, context);
+ }
+
@Override
public PlanFragment visitPhysicalDeferMaterializeOlapScan(
PhysicalDeferMaterializeOlapScan deferMaterializeOlapScan,
PlanTranslatorContext context) {
@@ -1094,21 +1049,9 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
}
scanNode.setNereidsId(schemaScan.getId());
context.getNereidsIdToPlanNodeIdMap().put(schemaScan.getId(),
scanNode.getId());
- SchemaScanNode finalScanNode = scanNode;
- context.getRuntimeTranslator().ifPresent(
- runtimeFilterGenerator ->
runtimeFilterGenerator.getContext().getTargetListByScan(schemaScan)
- .forEach(expr -> runtimeFilterGenerator
- .translateRuntimeFilterTarget(expr,
finalScanNode, context)
- )
- );
- // translate rf v2 target
- List<RuntimeFilterV2> rfV2s = context.getRuntimeFilterV2Context()
- .getRuntimeFilterV2ByTargetPlan(schemaScan);
- for (RuntimeFilterV2 rfV2 : rfV2s) {
- Expr targetExpr =
rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context);
- rfV2.setLegacyTargetNode(scanNode);
- rfV2.setLegacyTargetExpr(targetExpr);
- }
+
+ translateRuntimeFilter(schemaScan, scanNode, context);
+
context.addScanNode(scanNode, schemaScan);
PlanFragment planFragment = createPlanFragment(scanNode,
DataPartition.RANDOM, schemaScan);
context.addPlanFragment(planFragment);
@@ -1423,17 +1366,7 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
}
}
CTEScanNode cteScanNode = new CTEScanNode(tupleDescriptor);
- context.getRuntimeTranslator().ifPresent(runtimeFilterTranslator ->
-
runtimeFilterTranslator.getContext().getTargetListByScan(cteConsumer).forEach(
- expr ->
runtimeFilterTranslator.translateRuntimeFilterTarget(expr, cteScanNode,
context)));
- // translate rf v2 target
- List<RuntimeFilterV2> rfV2s = context.getRuntimeFilterV2Context()
- .getRuntimeFilterV2ByTargetPlan(cteConsumer);
- for (RuntimeFilterV2 rfV2 : rfV2s) {
- Expr targetExpr =
rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context);
- rfV2.setLegacyTargetNode(cteScanNode);
- rfV2.setLegacyTargetExpr(targetExpr);
- }
+ translateRuntimeFilter(cteConsumer, cteScanNode, context);
context.getCteScanNodeMap().put(multiCastFragment.getFragmentId(),
cteScanNode);
return multiCastFragment;
@@ -2806,15 +2739,8 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
olapScanNode.addTopnLazyMaterializeOutputColumns(((SlotReference)
slot).getOriginalColumn().get());
}
}
- // translate rf v2 target
- List<RuntimeFilterV2> rfV2s = context.getRuntimeFilterV2Context()
- .getRuntimeFilterV2ByTargetPlan(lazyScan);
- for (RuntimeFilterV2 rfV2 : rfV2s) {
- Expr targetExpr =
rfV2.getTargetExpression().accept(ExpressionTranslator.INSTANCE, context);
- rfV2.setLegacyTargetNode(olapScanNode);
- rfV2.setLegacyTargetExpr(targetExpr);
- }
- context.getTopnFilterContext().translateTarget(lazyScan, olapScanNode,
context);
+
+ translateRuntimeFilter(lazyScan, olapScanNode, context);
return planFragment;
}
diff --git a/regression-test/suites/query_p0/topn_lazy/topn_lazy.groovy
b/regression-test/suites/query_p0/topn_lazy/topn_lazy.groovy
index a8cede02d68..79faad12a11 100644
--- a/regression-test/suites/query_p0/topn_lazy/topn_lazy.groovy
+++ b/regression-test/suites/query_p0/topn_lazy/topn_lazy.groovy
@@ -19,6 +19,7 @@ suite("topn_lazy") {
sql """
set topn_lazy_materialization_threshold=1024;
set runtime_filter_mode=GLOBAL;
+ set runtime_filter_type=BLOOM_FILTER;
set TOPN_FILTER_RATIO=0.5;
set disable_join_reorder=true;
"""
@@ -64,6 +65,8 @@ suite("topn_lazy") {
contains("column_descs_lists[[`lo_orderkey` bigint NOT NULL,
`lo_linenumber` bigint NOT NULL, `lo_custkey` int NOT NULL, `lo_partkey` int
NOT NULL, `lo_suppkey` int NOT NULL, `lo_orderpriority` varchar(16) NOT NULL,
`lo_shippriority` int NOT NULL, `lo_quantity` bigint NOT NULL,
`lo_extendedprice` bigint NOT NULL, `lo_ordtotalprice` bigint NOT NULL,
`lo_discount` bigint NOT NULL, `lo_revenue` bigint NOT NULL, `lo_supplycost`
bigint NOT NULL, `lo_tax` bigint NOT NULL, `lo_commitdat [...]
contains("locations: [[3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15,
16, 17, 18], [19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33]")
contains("row_ids: [__DORIS_GLOBAL_ROWID_COL__lineorder,
__DORIS_GLOBAL_ROWID_COL__date]")
+ contains("runtime filters: RF000[bloom] -> lo_orderdate")
+
}
@@ -105,7 +108,7 @@ suite("topn_lazy") {
multiContains("VMaterializeNode", 1)
}
- explain {
+ explain {
sql """ select *
from
customer left semi join (
@@ -215,10 +218,6 @@ suite("topn_lazy") {
ORDER BY u.user_id LIMIT 5;
"""
- // Cleanup tables
- sql """ DROP TABLE IF EXISTS users """
- sql """ DROP TABLE IF EXISTS orders """
-
sql """
drop table if exists
table_100_undef_partitions2_keys3_properties4_distributed_by52;
create table table_100_undef_partitions2_keys3_properties4_distributed_by52 (
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]