This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new d600080a006 branch-3.1: [fix](nereids) fix bug when CTEConsumer is 
used as runtime filter target #51807 (#52750)
d600080a006 is described below

commit d600080a006c22868bc77e24f5568cc2efea095b
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Jul 8 14:14:18 2025 +0800

    branch-3.1: [fix](nereids) fix bug when CTEConsumer is used as runtime 
filter target #51807 (#52750)
    
    Cherry-picked from #51807
    
    Co-authored-by: minghong <[email protected]>
---
 .../glue/translator/RuntimeFilterTranslator.java   | 145 +++++++++++----------
 .../runtime_filter/cte-runtime-filter.out          | Bin 0 -> 1411 bytes
 .../runtime_filter/cte-runtime-filter.groovy       |  71 ++++++++++
 3 files changed, 149 insertions(+), 67 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
index 07e0af60173..d034145a463 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java
@@ -41,6 +41,8 @@ import org.apache.doris.thrift.TRuntimeFilterType;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -50,6 +52,7 @@ import java.util.Map;
  * translate runtime filter
  */
 public class RuntimeFilterTranslator {
+    private static final Logger LOG = 
LogManager.getLogger(RuntimeFilterTranslator.class);
 
     private final RuntimeFilterContext context;
 
@@ -96,77 +99,85 @@ public class RuntimeFilterTranslator {
                 
.getIgnoredRuntimeFilterIds().contains(filter.getId().asInt())) {
             return;
         }
-        Expr src = ExpressionTranslator.translate(filter.getSrcExpr(), ctx);
-        List<Expr> targetExprList = new ArrayList<>();
-        List<Map<TupleId, List<SlotId>>> targetTupleIdMapList = new 
ArrayList<>();
-        List<ScanNode> scanNodeList = new ArrayList<>();
-        boolean hasInvalidTarget = false;
-        for (int i = 0; i < filter.getTargetExpressions().size(); i++) {
-            Slot curTargetSlot = filter.getTargetSlots().get(i);
-            Expression curTargetExpression = 
filter.getTargetExpressions().get(i);
-            SlotRef targetSlotRef = 
context.getExprIdToOlapScanNodeSlotRef().get(curTargetSlot.getExprId());
-            if (targetSlotRef == null) {
-                context.setTargetNullCount();
-                hasInvalidTarget = true;
-                break;
-            }
-            ScanNode scanNode = 
context.getScanNodeOfLegacyRuntimeFilterTarget().get(curTargetSlot);
-            Expr targetExpr;
-            if (curTargetSlot.equals(curTargetExpression)) {
-                targetExpr = targetSlotRef;
-            } else {
-                // map nereids target slot to original planner slot
-                
Preconditions.checkArgument(curTargetExpression.getInputSlots().size() == 1,
-                        "target expression is invalid, input slot num > 1; 
filter :" + filter);
-                Slot slotInTargetExpression = 
curTargetExpression.getInputSlots().iterator().next();
-                
Preconditions.checkArgument(slotInTargetExpression.equals(curTargetSlot)
-                        || 
curTargetSlot.equals(context.getAliasTransferMap().get(slotInTargetExpression).second));
-                RuntimeFilterExpressionTranslator translator = new 
RuntimeFilterExpressionTranslator(targetSlotRef);
-                targetExpr = curTargetExpression.accept(translator, ctx);
-            }
+        try {
+            Expr src = ExpressionTranslator.translate(filter.getSrcExpr(), 
ctx);
+            List<Expr> targetExprList = new ArrayList<>();
+            List<Map<TupleId, List<SlotId>>> targetTupleIdMapList = new 
ArrayList<>();
+            List<ScanNode> scanNodeList = new ArrayList<>();
+            boolean hasInvalidTarget = false;
+            for (int i = 0; i < filter.getTargetExpressions().size(); i++) {
+                Slot curTargetSlot = filter.getTargetSlots().get(i);
+                Expression curTargetExpression = 
filter.getTargetExpressions().get(i);
+                SlotRef targetSlotRef = 
context.getExprIdToOlapScanNodeSlotRef().get(curTargetSlot.getExprId());
+                if (targetSlotRef == null) {
+                    context.setTargetNullCount();
+                    hasInvalidTarget = true;
+                    break;
+                }
+                ScanNode scanNode = 
context.getScanNodeOfLegacyRuntimeFilterTarget().get(curTargetSlot);
+                Expr targetExpr;
+                if (curTargetSlot.equals(curTargetExpression)) {
+                    targetExpr = targetSlotRef;
+                } else {
+                    // map nereids target slot to original planner slot
+                    
Preconditions.checkArgument(curTargetExpression.getInputSlots().size() == 1,
+                            "target expression is invalid, input slot num > 1; 
filter :" + filter);
+                    Slot slotInTargetExpression = 
curTargetExpression.getInputSlots().iterator().next();
+                    
Preconditions.checkArgument(slotInTargetExpression.equals(curTargetSlot)
+                            || 
curTargetSlot.equals(context.getAliasTransferMap().get(slotInTargetExpression).second));
+                    RuntimeFilterExpressionTranslator translator = new 
RuntimeFilterExpressionTranslator(targetSlotRef);
+                    targetExpr = curTargetExpression.accept(translator, ctx);
+                }
 
-            // adjust data type
-            if (!src.getType().equals(targetExpr.getType()) && 
filter.getType() != TRuntimeFilterType.BITMAP) {
-                targetExpr = new CastExpr(src.getType(), targetExpr);
+                // adjust data type
+                if (!src.getType().equals(targetExpr.getType()) && 
filter.getType() != TRuntimeFilterType.BITMAP) {
+                    targetExpr = new CastExpr(src.getType(), targetExpr);
+                }
+                TupleId targetTupleId = 
targetSlotRef.getDesc().getParent().getId();
+                SlotId targetSlotId = targetSlotRef.getSlotId();
+                scanNodeList.add(scanNode);
+                targetExprList.add(targetExpr);
+                targetTupleIdMapList.add(ImmutableMap.of(targetTupleId, 
ImmutableList.of(targetSlotId)));
             }
-            SlotRef targetSlot = targetSlotRef.getSrcSlotRef();
-            TupleId targetTupleId = targetSlot.getDesc().getParent().getId();
-            SlotId targetSlotId = targetSlot.getSlotId();
-            scanNodeList.add(scanNode);
-            targetExprList.add(targetExpr);
-            targetTupleIdMapList.add(ImmutableMap.of(targetTupleId, 
ImmutableList.of(targetSlotId)));
-        }
-        if (!hasInvalidTarget) {
-            org.apache.doris.planner.RuntimeFilter origFilter
-                    = 
org.apache.doris.planner.RuntimeFilter.fromNereidsRuntimeFilter(
-                    filter, node, src, targetExprList,
-                    targetTupleIdMapList, context.getLimits());
-            if (node instanceof HashJoinNode) {
-                origFilter.setIsBroadcast(((HashJoinNode) 
node).getDistributionMode() == DistributionMode.BROADCAST);
-                origFilter.setSingleEq(((HashJoinNode) 
node).getEqJoinConjuncts().size());
-            } else {
-                // nest loop join
-                origFilter.setIsBroadcast(true);
+            if (!hasInvalidTarget) {
+                org.apache.doris.planner.RuntimeFilter origFilter
+                        = 
org.apache.doris.planner.RuntimeFilter.fromNereidsRuntimeFilter(
+                        filter, node, src, targetExprList,
+                        targetTupleIdMapList, context.getLimits());
+                if (node instanceof HashJoinNode) {
+                    origFilter.setIsBroadcast(
+                            ((HashJoinNode) node).getDistributionMode() == 
DistributionMode.BROADCAST);
+                    origFilter.setSingleEq(((HashJoinNode) 
node).getEqJoinConjuncts().size());
+                } else {
+                    // nest loop join
+                    origFilter.setIsBroadcast(true);
+                }
+                boolean isLocalTarget = scanNodeList.stream().allMatch(e ->
+                        !(e instanceof CTEScanNode) && 
e.getFragmentId().equals(node.getFragmentId()));
+                for (int i = 0; i < targetExprList.size(); i++) {
+                    ScanNode scanNode = scanNodeList.get(i);
+                    Expr targetExpr = targetExprList.get(i);
+                    origFilter.addTarget(new RuntimeFilterTarget(
+                            scanNode, targetExpr, true, isLocalTarget));
+                }
+                origFilter.setBitmapFilterNotIn(filter.isBitmapFilterNotIn());
+                
origFilter.setBloomFilterSizeCalculatedByNdv(filter.isBloomFilterSizeCalculatedByNdv());
+                org.apache.doris.planner.RuntimeFilter finalizedFilter = 
finalize(origFilter);
+                scanNodeList.stream().filter(e -> e.getStatisticalType() == 
StatisticalType.CTE_SCAN_NODE)
+                        .forEach(f -> {
+                            DataStreamSink sink = 
context.getPlanNodeIdToCTEDataSinkMap().get(f.getId());
+                            if (sink != null) {
+                                sink.addRuntimeFilter(finalizedFilter);
+                            }
+                        });
+                context.getLegacyFilters().add(finalizedFilter);
             }
-            boolean isLocalTarget = scanNodeList.stream().allMatch(e ->
-                    !(e instanceof CTEScanNode) && 
e.getFragmentId().equals(node.getFragmentId()));
-            for (int i = 0; i < targetExprList.size(); i++) {
-                ScanNode scanNode = scanNodeList.get(i);
-                Expr targetExpr = targetExprList.get(i);
-                origFilter.addTarget(new RuntimeFilterTarget(
-                        scanNode, targetExpr, true, isLocalTarget));
+        } catch (Exception e) {
+            LOG.info("failed to translate runtime filter: " + e.getMessage());
+            // throw exception in debug mode
+            if (ConnectContext.get() != null && 
ConnectContext.get().getSessionVariable().feDebug) {
+                throw e;
             }
-            origFilter.setBitmapFilterNotIn(filter.isBitmapFilterNotIn());
-            
origFilter.setBloomFilterSizeCalculatedByNdv(filter.isBloomFilterSizeCalculatedByNdv());
-            org.apache.doris.planner.RuntimeFilter finalizedFilter = 
finalize(origFilter);
-            scanNodeList.stream().filter(e -> e.getStatisticalType() == 
StatisticalType.CTE_SCAN_NODE)
-                                 .forEach(f -> {
-                                     DataStreamSink sink = 
context.getPlanNodeIdToCTEDataSinkMap().get(f.getId());
-                                     if (sink != null) {
-                                         
sink.addRuntimeFilter(finalizedFilter);
-                                     }
-                                 });
-            context.getLegacyFilters().add(finalizedFilter);
         }
     }
 
diff --git 
a/regression-test/data/nereids_p0/runtime_filter/cte-runtime-filter.out 
b/regression-test/data/nereids_p0/runtime_filter/cte-runtime-filter.out
new file mode 100644
index 00000000000..cff6f05e5b3
Binary files /dev/null and 
b/regression-test/data/nereids_p0/runtime_filter/cte-runtime-filter.out differ
diff --git 
a/regression-test/suites/nereids_p0/runtime_filter/cte-runtime-filter.groovy 
b/regression-test/suites/nereids_p0/runtime_filter/cte-runtime-filter.groovy
new file mode 100644
index 00000000000..b5ce12e0851
--- /dev/null
+++ b/regression-test/suites/nereids_p0/runtime_filter/cte-runtime-filter.groovy
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite('cte-runtime-filter') {
+    
+    sql '''
+    drop table if exists cte_runtime_filter_table;
+    CREATE TABLE `cte_runtime_filter_table` (
+    `part_dt` bigint NOT NULL COMMENT '日期',
+    `group_id` bigint NOT NULL COMMENT '客群id',
+    `user_id` bigint NOT NULL COMMENT '用户id'
+    ) ENGINE=OLAP
+    UNIQUE KEY(`part_dt`, `group_id`, `user_id`)
+    DISTRIBUTED BY HASH(`user_id`) BUCKETS 8
+    PROPERTIES (
+    "replication_allocation" = "tag.location.default: 1"
+    );
+
+    insert into cte_runtime_filter_table values (1, 1, 1);
+
+    set inline_cte_referenced_threshold=0;
+    set disable_join_reorder = true;
+    set enable_runtime_filter_prune=false;
+    set runtime_filter_mode=global;
+    set runtime_filter_type=2;
+    '''
+    
+    qt_shape_onerow '''
+        explain shape plan
+        with cte as ((select 1 as id))
+        select * 
+        from cte a
+        join cte_runtime_filter_table b on a.id=b.user_id ;
+    '''
+
+    qt_exec_onerow'''
+        with cte as ((select 1 as id))
+        select * 
+        from cte a
+        join cte_runtime_filter_table b on a.id=b.user_id ;
+    '''
+
+    qt_shape_cte_cte '''
+        explain shape plan
+        with cte as ((select * from cte_runtime_filter_table))
+        select * 
+        from cte a
+        join cte_runtime_filter_table b on a.user_id=b.user_id ;
+        '''
+
+    qt_exec_cte_cte '''
+        with cte as ((select * from cte_runtime_filter_table))
+        select * 
+        from cte a
+        join cte_runtime_filter_table b on a.user_id=b.user_id ;
+        '''
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to