This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new d600080a006 branch-3.1: [fix](nereids) fix bug when CTEConsumer is
used as runtime filter target #51807 (#52750)
d600080a006 is described below
commit d600080a006c22868bc77e24f5568cc2efea095b
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jul 8 14:14:18 2025 +0800
branch-3.1: [fix](nereids) fix bug when CTEConsumer is used as runtime
filter target #51807 (#52750)
Cherry-picked from #51807
Co-authored-by: minghong <[email protected]>
---
.../glue/translator/RuntimeFilterTranslator.java | 145 +++++++++++----------
.../runtime_filter/cte-runtime-filter.out | Bin 0 -> 1411 bytes
.../runtime_filter/cte-runtime-filter.groovy | 71 ++++++++++
3 files changed, 149 insertions(+), 67 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
index 07e0af60173..d034145a463 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
@@ -41,6 +41,8 @@ import org.apache.doris.thrift.TRuntimeFilterType;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
@@ -50,6 +52,7 @@ import java.util.Map;
* translate runtime filter
*/
public class RuntimeFilterTranslator {
+ private static final Logger LOG =
LogManager.getLogger(RuntimeFilterTranslator.class);
private final RuntimeFilterContext context;
@@ -96,77 +99,85 @@ public class RuntimeFilterTranslator {
.getIgnoredRuntimeFilterIds().contains(filter.getId().asInt())) {
return;
}
- Expr src = ExpressionTranslator.translate(filter.getSrcExpr(), ctx);
- List<Expr> targetExprList = new ArrayList<>();
- List<Map<TupleId, List<SlotId>>> targetTupleIdMapList = new
ArrayList<>();
- List<ScanNode> scanNodeList = new ArrayList<>();
- boolean hasInvalidTarget = false;
- for (int i = 0; i < filter.getTargetExpressions().size(); i++) {
- Slot curTargetSlot = filter.getTargetSlots().get(i);
- Expression curTargetExpression =
filter.getTargetExpressions().get(i);
- SlotRef targetSlotRef =
context.getExprIdToOlapScanNodeSlotRef().get(curTargetSlot.getExprId());
- if (targetSlotRef == null) {
- context.setTargetNullCount();
- hasInvalidTarget = true;
- break;
- }
- ScanNode scanNode =
context.getScanNodeOfLegacyRuntimeFilterTarget().get(curTargetSlot);
- Expr targetExpr;
- if (curTargetSlot.equals(curTargetExpression)) {
- targetExpr = targetSlotRef;
- } else {
- // map nereids target slot to original planner slot
-
Preconditions.checkArgument(curTargetExpression.getInputSlots().size() == 1,
- "target expression is invalid, input slot num > 1;
filter :" + filter);
- Slot slotInTargetExpression =
curTargetExpression.getInputSlots().iterator().next();
-
Preconditions.checkArgument(slotInTargetExpression.equals(curTargetSlot)
- ||
curTargetSlot.equals(context.getAliasTransferMap().get(slotInTargetExpression).second));
- RuntimeFilterExpressionTranslator translator = new
RuntimeFilterExpressionTranslator(targetSlotRef);
- targetExpr = curTargetExpression.accept(translator, ctx);
- }
+ try {
+ Expr src = ExpressionTranslator.translate(filter.getSrcExpr(),
ctx);
+ List<Expr> targetExprList = new ArrayList<>();
+ List<Map<TupleId, List<SlotId>>> targetTupleIdMapList = new
ArrayList<>();
+ List<ScanNode> scanNodeList = new ArrayList<>();
+ boolean hasInvalidTarget = false;
+ for (int i = 0; i < filter.getTargetExpressions().size(); i++) {
+ Slot curTargetSlot = filter.getTargetSlots().get(i);
+ Expression curTargetExpression =
filter.getTargetExpressions().get(i);
+ SlotRef targetSlotRef =
context.getExprIdToOlapScanNodeSlotRef().get(curTargetSlot.getExprId());
+ if (targetSlotRef == null) {
+ context.setTargetNullCount();
+ hasInvalidTarget = true;
+ break;
+ }
+ ScanNode scanNode =
context.getScanNodeOfLegacyRuntimeFilterTarget().get(curTargetSlot);
+ Expr targetExpr;
+ if (curTargetSlot.equals(curTargetExpression)) {
+ targetExpr = targetSlotRef;
+ } else {
+ // map nereids target slot to original planner slot
+
Preconditions.checkArgument(curTargetExpression.getInputSlots().size() == 1,
+ "target expression is invalid, input slot num > 1;
filter :" + filter);
+ Slot slotInTargetExpression =
curTargetExpression.getInputSlots().iterator().next();
+
Preconditions.checkArgument(slotInTargetExpression.equals(curTargetSlot)
+ ||
curTargetSlot.equals(context.getAliasTransferMap().get(slotInTargetExpression).second));
+ RuntimeFilterExpressionTranslator translator = new
RuntimeFilterExpressionTranslator(targetSlotRef);
+ targetExpr = curTargetExpression.accept(translator, ctx);
+ }
- // adjust data type
- if (!src.getType().equals(targetExpr.getType()) &&
filter.getType() != TRuntimeFilterType.BITMAP) {
- targetExpr = new CastExpr(src.getType(), targetExpr);
+ // adjust data type
+ if (!src.getType().equals(targetExpr.getType()) &&
filter.getType() != TRuntimeFilterType.BITMAP) {
+ targetExpr = new CastExpr(src.getType(), targetExpr);
+ }
+ TupleId targetTupleId =
targetSlotRef.getDesc().getParent().getId();
+ SlotId targetSlotId = targetSlotRef.getSlotId();
+ scanNodeList.add(scanNode);
+ targetExprList.add(targetExpr);
+ targetTupleIdMapList.add(ImmutableMap.of(targetTupleId,
ImmutableList.of(targetSlotId)));
}
- SlotRef targetSlot = targetSlotRef.getSrcSlotRef();
- TupleId targetTupleId = targetSlot.getDesc().getParent().getId();
- SlotId targetSlotId = targetSlot.getSlotId();
- scanNodeList.add(scanNode);
- targetExprList.add(targetExpr);
- targetTupleIdMapList.add(ImmutableMap.of(targetTupleId,
ImmutableList.of(targetSlotId)));
- }
- if (!hasInvalidTarget) {
- org.apache.doris.planner.RuntimeFilter origFilter
- =
org.apache.doris.planner.RuntimeFilter.fromNereidsRuntimeFilter(
- filter, node, src, targetExprList,
- targetTupleIdMapList, context.getLimits());
- if (node instanceof HashJoinNode) {
- origFilter.setIsBroadcast(((HashJoinNode)
node).getDistributionMode() == DistributionMode.BROADCAST);
- origFilter.setSingleEq(((HashJoinNode)
node).getEqJoinConjuncts().size());
- } else {
- // nest loop join
- origFilter.setIsBroadcast(true);
+ if (!hasInvalidTarget) {
+ org.apache.doris.planner.RuntimeFilter origFilter
+ =
org.apache.doris.planner.RuntimeFilter.fromNereidsRuntimeFilter(
+ filter, node, src, targetExprList,
+ targetTupleIdMapList, context.getLimits());
+ if (node instanceof HashJoinNode) {
+ origFilter.setIsBroadcast(
+ ((HashJoinNode) node).getDistributionMode() ==
DistributionMode.BROADCAST);
+ origFilter.setSingleEq(((HashJoinNode)
node).getEqJoinConjuncts().size());
+ } else {
+ // nest loop join
+ origFilter.setIsBroadcast(true);
+ }
+ boolean isLocalTarget = scanNodeList.stream().allMatch(e ->
+ !(e instanceof CTEScanNode) &&
e.getFragmentId().equals(node.getFragmentId()));
+ for (int i = 0; i < targetExprList.size(); i++) {
+ ScanNode scanNode = scanNodeList.get(i);
+ Expr targetExpr = targetExprList.get(i);
+ origFilter.addTarget(new RuntimeFilterTarget(
+ scanNode, targetExpr, true, isLocalTarget));
+ }
+ origFilter.setBitmapFilterNotIn(filter.isBitmapFilterNotIn());
+
origFilter.setBloomFilterSizeCalculatedByNdv(filter.isBloomFilterSizeCalculatedByNdv());
+ org.apache.doris.planner.RuntimeFilter finalizedFilter =
finalize(origFilter);
+ scanNodeList.stream().filter(e -> e.getStatisticalType() ==
StatisticalType.CTE_SCAN_NODE)
+ .forEach(f -> {
+ DataStreamSink sink =
context.getPlanNodeIdToCTEDataSinkMap().get(f.getId());
+ if (sink != null) {
+ sink.addRuntimeFilter(finalizedFilter);
+ }
+ });
+ context.getLegacyFilters().add(finalizedFilter);
}
- boolean isLocalTarget = scanNodeList.stream().allMatch(e ->
- !(e instanceof CTEScanNode) &&
e.getFragmentId().equals(node.getFragmentId()));
- for (int i = 0; i < targetExprList.size(); i++) {
- ScanNode scanNode = scanNodeList.get(i);
- Expr targetExpr = targetExprList.get(i);
- origFilter.addTarget(new RuntimeFilterTarget(
- scanNode, targetExpr, true, isLocalTarget));
+ } catch (Exception e) {
+ LOG.info("failed to translate runtime filter: " + e.getMessage());
+ // throw exception in debug mode
+ if (ConnectContext.get() != null &&
ConnectContext.get().getSessionVariable().feDebug) {
+ throw e;
}
- origFilter.setBitmapFilterNotIn(filter.isBitmapFilterNotIn());
-
origFilter.setBloomFilterSizeCalculatedByNdv(filter.isBloomFilterSizeCalculatedByNdv());
- org.apache.doris.planner.RuntimeFilter finalizedFilter =
finalize(origFilter);
- scanNodeList.stream().filter(e -> e.getStatisticalType() ==
StatisticalType.CTE_SCAN_NODE)
- .forEach(f -> {
- DataStreamSink sink =
context.getPlanNodeIdToCTEDataSinkMap().get(f.getId());
- if (sink != null) {
-
sink.addRuntimeFilter(finalizedFilter);
- }
- });
- context.getLegacyFilters().add(finalizedFilter);
}
}
diff --git
a/regression-test/data/nereids_p0/runtime_filter/cte-runtime-filter.out
b/regression-test/data/nereids_p0/runtime_filter/cte-runtime-filter.out
new file mode 100644
index 00000000000..cff6f05e5b3
Binary files /dev/null and
b/regression-test/data/nereids_p0/runtime_filter/cte-runtime-filter.out differ
diff --git
a/regression-test/suites/nereids_p0/runtime_filter/cte-runtime-filter.groovy
b/regression-test/suites/nereids_p0/runtime_filter/cte-runtime-filter.groovy
new file mode 100644
index 00000000000..b5ce12e0851
--- /dev/null
+++ b/regression-test/suites/nereids_p0/runtime_filter/cte-runtime-filter.groovy
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite('cte-runtime-filter') {
+
+ sql '''
+ drop table if exists cte_runtime_filter_table;
+ CREATE TABLE `cte_runtime_filter_table` (
+ `part_dt` bigint NOT NULL COMMENT '日期',
+ `group_id` bigint NOT NULL COMMENT '客群id',
+ `user_id` bigint NOT NULL COMMENT '用户id'
+ ) ENGINE=OLAP
+ UNIQUE KEY(`part_dt`, `group_id`, `user_id`)
+ DISTRIBUTED BY HASH(`user_id`) BUCKETS 8
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+
+ insert into cte_runtime_filter_table values (1, 1, 1);
+
+ set inline_cte_referenced_threshold=0;
+ set disable_join_reorder = true;
+ set enable_runtime_filter_prune=false;
+ set runtime_filter_mode=global;
+ set runtime_filter_type=2;
+ '''
+
+ qt_shape_onerow '''
+ explain shape plan
+ with cte as ((select 1 as id))
+ select *
+ from cte a
+ join cte_runtime_filter_table b on a.id=b.user_id ;
+ '''
+
+ qt_exec_onerow'''
+ with cte as ((select 1 as id))
+ select *
+ from cte a
+ join cte_runtime_filter_table b on a.id=b.user_id ;
+ '''
+
+ qt_shape_cte_cte '''
+ explain shape plan
+ with cte as ((select * from cte_runtime_filter_table))
+ select *
+ from cte a
+ join cte_runtime_filter_table b on a.user_id=b.user_id ;
+ '''
+
+ qt_exec_cte_cte '''
+ with cte as ((select * from cte_runtime_filter_table))
+ select *
+ from cte a
+ join cte_runtime_filter_table b on a.user_id=b.user_id ;
+ '''
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]