This is an automated email from the ASF dual-hosted git repository. 924060929 pushed a commit to branch fe_local_shuffle_rebase in repository https://gitbox.apache.org/repos/asf/doris.git
commit 48ebe9f2039f8976ba4cfe2ae8f86a7846155970 Author: 924060929 <[email protected]> AuthorDate: Tue May 19 22:10:18 2026 +0800 [fix](local shuffle) Skip LocalExchange under RecursiveCteNode and mark it serial (DORIS-25865) The FE local-shuffle planner used to insert a LocalExchangeNode directly under RecursiveCteNode, which broke two RecursiveCte invariants: 1. ThriftPlansBuilder locates the recursive sender fragment via `recursiveCteNode.getChild(1).getChild(0).getFragment()`. A wrapper LE between RecCte and the cross-fragment ExchangeNode shifts that path off the receiver and pulls the recursive producer fragment into `fragmentsToReset`, so BE rejects with [INTERNAL_ERROR]Fragment N contains a recursive CTE node from RecCTESourceOperatorX::prepare(). 2. BE's RecCTESourceOperatorX::is_serial_operator() always returns true. RecursiveCteNode#isSerialNode() on the FE side defaulted to false, so the planner left the producer fragment with parallel=N sender pipelines even though only one instance actually emits data. The downstream cross-fragment Exchange then waits forever on the N-1 silent senders. Fix in RecursiveCteNode: - override isSerialNode() to return true so addLocalExchangeForFragment wraps the fragment root with PASSTHROUGH LE and fans the single producer out to N parallel sinks (mirrors BE-native behaviour); - override enforceAndDeriveLocalExchange to call children's own enforceAndDeriveLocalExchange directly, bypassing the framework's enforceRequire so no LE gets inserted between RecCte and its cross-fragment Exchange children — children's subtrees still get LE planning as normal. Add regression test test_local_shuffle_recursive_cte covering the three downstream consumer shapes the JIRA listed plus join / negative control: rec_cte_agg, rec_cte_window, rec_cte_grouping_sets, rec_cte_select, rec_cte_join. Each is asserted to produce identical rows under enable_local_shuffle_planner=true vs =false. --- .../org/apache/doris/planner/RecursiveCteNode.java | 30 +++- .../test_local_shuffle_recursive_cte.groovy | 181 +++++++++++++++++++++ 2 files changed, 209 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteNode.java index 3a7bdd63745..a32b599374a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteNode.java @@ -92,14 +92,40 @@ public class RecursiveCteNode extends PlanNode { .add("isUnionAll", isUnionAll).toString(); } + @Override + public boolean isSerialNode() { + // Mirror BE's RecCTESourceOperatorX::is_serial_operator() which always returns true: + // the recursive driver runs sequentially in one task, so downstream consumers must see + // RecursiveCteNode as serial too. Without this, FE planner leaves the producer + // fragment with parallel=N senders but only one actually emits data — the cross- + // fragment Exchange receiver expects N senders done and hangs waiting on the other + // N-1. Marking serial here lets AddLocalExchange#addLocalExchangeForFragment wrap + // the root with a PASSTHROUGH LE that fans the serial RecCte output out to N + // parallel sinks, matching BE-native _plan_local_exchange behaviour. + return true; + } + @Override public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange( PlanTranslatorContext translatorContext, PlanNode parent, LocalExchangeTypeRequire parentRequire) { + // Recurse into children to give them a chance to plan local exchanges below + // themselves, but never insert one *directly* under RecursiveCteNode: + // - ThriftPlansBuilder locates the recursive sender fragment via + // `getChild(1).getChild(0).getFragment()`; a LocalExchangeNode wrapper + // would shift that path off the cross-fragment ExchangeNode and pull the + // wrong fragment into `fragmentsToReset`. + // - BE's RecCTESourceOperatorX wires the anchor / recursive side pipelines + // directly against the Exchange children (pipeline_fragment_context.cpp + // REC_CTE_NODE handling); injecting an extra LE pipeline between them + // mis-routes the rerun signal and crashes BE during execution. + // Both issues are pure shape mismatches — RecursiveCteNode's children are + // already the cross-fragment ExchangeNode receivers, which BE drives serially + // itself, so no FE-side fan-out is needed here. ArrayList<PlanNode> newChildren = Lists.newArrayList(); for (int i = 0; i < children.size(); i++) { PlanNode child = children.get(i); - Pair<PlanNode, LocalExchangeType> childOutput = enforceRequire( - translatorContext, child, i, LocalExchangeTypeRequire.noRequire()); + Pair<PlanNode, LocalExchangeType> childOutput = child.enforceAndDeriveLocalExchange( + translatorContext, this, LocalExchangeTypeRequire.noRequire()); newChildren.add(childOutput.first); } this.children = newChildren; diff --git a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_recursive_cte.groovy b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_recursive_cte.groovy new file mode 100644 index 00000000000..6b5edeefbd2 --- /dev/null +++ b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_recursive_cte.groovy @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/** + * Regression for DORIS-25865: FE local-shuffle planner used to insert a + * LocalExchangeNode directly under RecursiveCteNode, which collided with two + * RecursiveCte invariants: + * + * 1. ThriftPlansBuilder locates the recursive sender fragment via + * `recursiveCteNode.getChild(1).getChild(0).getFragment()`. An extra LE + * wrapper shifted that path off the cross-fragment ExchangeNode and + * pulled the RecCTE producer fragment itself into `fragmentsToReset`. + * BE then rejected with `[INTERNAL_ERROR]Fragment N contains a recursive + * CTE node` during `RecCTESourceOperatorX::prepare()`. + * + * 2. BE's `RecCTESourceOperatorX::is_serial_operator()` always returns true, + * but `RecursiveCteNode.isSerialNode()` on the FE side defaulted to + * false. Without the serial marker, the FE planner left the producer + * fragment with parallel=N sender pipelines while RecCte actually emits + * data from a single instance — the cross-fragment Exchange receiver + * waited forever on the N-1 silent senders and the query hung. + * + * Fix lives in `RecursiveCteNode`: + * - override `isSerialNode()` to return true (mirrors BE), + * - override `enforceAndDeriveLocalExchange` to bypass the framework's + * `enforceRequire` so no LE is inserted between RecCte and its Exchange + * children (children's own subtrees still get LE planning). + * + * This test asserts: + * - planner=true succeeds (no "Fragment N contains a recursive CTE node"); + * - results between planner=true and planner=false are identical for the + * three downstream-consumer shapes the JIRA listed: aggregate, window, + * grouping sets; + * - the negative control (RecCte directly consumed by SELECT) still works + * in both modes — it would pass even with the original bug, but covers + * the simple path so a regression there is caught immediately. + */ +suite("test_local_shuffle_recursive_cte", "nereids_p0") { + + sql "SET enable_nereids_planner=true" + sql "SET enable_fallback_to_original_planner=false" + sql "SET enable_sql_cache=false" + sql "SET enable_local_shuffle=true" + sql "SET parallel_pipeline_task_num=4" + sql "SET runtime_filter_mode=off" + + // For each SQL, run it twice — once with FE planner, once with BE planner — + // and assert result rows are identical. Plan shape intentionally not asserted: + // the two planners legitimately differ on LE placement. + def checkConsistency = { String tag, String testSql -> + def sqlOn = """SELECT /*+SET_VAR(enable_local_shuffle_planner=true)*/""" + testSql.replaceFirst(/(?i)^\s*select/, "") + def sqlOff = """SELECT /*+SET_VAR(enable_local_shuffle_planner=false)*/""" + testSql.replaceFirst(/(?i)^\s*select/, "") + check_sql_equal(sqlOn, sqlOff) + } + + // ============================================================ + // Case 1 — Recursive CTE consumed by aggregate + // + // Original error with FE planner: + // errCode = 2, detailMessage = [INTERNAL_ERROR] + // Fragment N contains a recursive CTE node + // ============================================================ + checkConsistency("rec_cte_agg", """ + SELECT n_mod, count(*) AS c, sum(s) AS total + FROM ( + WITH RECURSIVE cte(n, s) AS ( + SELECT CAST(1 AS INT), CAST(1 AS BIGINT) + UNION ALL + SELECT CAST(n + 1 AS INT), CAST(s + n + 1 AS BIGINT) + FROM cte WHERE n < 30 + ) + SELECT n % 7 AS n_mod, s FROM cte + ) t + GROUP BY n_mod + ORDER BY n_mod + """) + + // ============================================================ + // Case 2 — Recursive CTE consumed by window function + // + // Original failure with FE planner: hung indefinitely because the + // producer fragment had parallel=N sender pipelines but only one of + // them actually emits data. Fixed by marking RecursiveCteNode serial + // so AddLocalExchange wraps the root with a PASSTHROUGH LE that fans + // the single producer out to N parallel sinks. + // ============================================================ + checkConsistency("rec_cte_window", """ + SELECT n, sum(n) OVER (PARTITION BY n % 5) AS sum_n + FROM ( + WITH RECURSIVE cte(n) AS ( + SELECT CAST(1 AS INT) + UNION ALL + SELECT CAST(n + 1 AS INT) FROM cte WHERE n < 30 + ) + SELECT n FROM cte + ) t + ORDER BY n + """) + + // ============================================================ + // Case 3 — Recursive CTE consumed by GROUPING SETS + // + // Suggested by the JIRA reporter as a third "downstream operator that + // introduces additional fragments / local exchanges" — together with + // aggregate and window it covers the original failure pattern. + // ============================================================ + checkConsistency("rec_cte_grouping_sets", """ + SELECT n_mod, n_bucket, count(*) AS c, sum(s) AS total + FROM ( + WITH RECURSIVE cte(n, s) AS ( + SELECT CAST(1 AS INT), CAST(1 AS BIGINT) + UNION ALL + SELECT CAST(n + 1 AS INT), CAST(s + n + 1 AS BIGINT) + FROM cte WHERE n < 30 + ) + SELECT n % 7 AS n_mod, n % 3 AS n_bucket, s FROM cte + ) t + GROUP BY GROUPING SETS ((n_mod), (n_bucket), (n_mod, n_bucket)) + ORDER BY n_mod NULLS LAST, n_bucket NULLS LAST + """) + + // ============================================================ + // Negative control — RecCte directly consumed by SELECT. + // This path didn't generate the extra fragments needed to trigger the + // original bug, but exercising it ensures the fix doesn't regress the + // simple consumer shape. + // ============================================================ + checkConsistency("rec_cte_select", """ + SELECT n + FROM ( + WITH RECURSIVE cte(n) AS ( + SELECT CAST(1 AS INT) + UNION ALL + SELECT CAST(n + 1 AS INT) FROM cte WHERE n < 5 + ) + SELECT n FROM cte + ) t + ORDER BY n + """) + + // ============================================================ + // Case 4 — RecCte feeding a hash JOIN + // + // Another downstream consumer that introduces an extra fragment via a + // shuffle join. Verifies the serial-RecCte → PASSTHROUGH LE wrap also + // works when the consumer requires hash distribution. + // ============================================================ + checkConsistency("rec_cte_join", """ + SELECT a.n, b.n AS m + FROM ( + WITH RECURSIVE cte(n) AS ( + SELECT CAST(1 AS INT) + UNION ALL + SELECT CAST(n + 1 AS INT) FROM cte WHERE n < 10 + ) + SELECT n FROM cte + ) a JOIN ( + WITH RECURSIVE cte(n) AS ( + SELECT CAST(2 AS INT) + UNION ALL + SELECT CAST(n + 2 AS INT) FROM cte WHERE n < 10 + ) + SELECT n FROM cte + ) b ON a.n = b.n + ORDER BY a.n + """) +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
