This is an automated email from the ASF dual-hosted git repository.

924060929 pushed a commit to branch fe_local_shuffle_rebase
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 48ebe9f2039f8976ba4cfe2ae8f86a7846155970
Author: 924060929 <[email protected]>
AuthorDate: Tue May 19 22:10:18 2026 +0800

    [fix](local shuffle) Skip LocalExchange under RecursiveCteNode and mark it 
serial (DORIS-25865)
    
    The FE local-shuffle planner used to insert a LocalExchangeNode directly
    under RecursiveCteNode, which broke two RecursiveCte invariants:
    
    1. ThriftPlansBuilder locates the recursive sender fragment via
       `recursiveCteNode.getChild(1).getChild(0).getFragment()`.  A wrapper LE
       between RecCte and the cross-fragment ExchangeNode shifts that path off
       the receiver and pulls the recursive producer fragment into
       `fragmentsToReset`, so BE rejects with
         [INTERNAL_ERROR]Fragment N contains a recursive CTE node
       from RecCTESourceOperatorX::prepare().
    
    2. BE's RecCTESourceOperatorX::is_serial_operator() always returns true.
       RecursiveCteNode#isSerialNode() on the FE side defaulted to false, so
       the planner left the producer fragment with parallel=N sender pipelines
       even though only one instance actually emits data.  The downstream
       cross-fragment Exchange then waits forever on the N-1 silent senders.
    
    Fix in RecursiveCteNode:
      - override isSerialNode() to return true so addLocalExchangeForFragment
        wraps the fragment root with PASSTHROUGH LE and fans the single
        producer out to N parallel sinks (mirrors BE-native behaviour);
      - override enforceAndDeriveLocalExchange to call children's own
        enforceAndDeriveLocalExchange directly, bypassing the framework's
        enforceRequire so no LE gets inserted between RecCte and its
        cross-fragment Exchange children — children's subtrees still get LE
        planning as normal.
    
    Add regression test test_local_shuffle_recursive_cte covering the three
    downstream consumer shapes the JIRA listed plus join / negative control:
      rec_cte_agg, rec_cte_window, rec_cte_grouping_sets, rec_cte_select,
      rec_cte_join.  Each is asserted to produce identical rows under
      enable_local_shuffle_planner=true vs =false.
---
 .../org/apache/doris/planner/RecursiveCteNode.java |  30 +++-
 .../test_local_shuffle_recursive_cte.groovy        | 181 +++++++++++++++++++++
 2 files changed, 209 insertions(+), 2 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteNode.java
index 3a7bdd63745..a32b599374a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteNode.java
@@ -92,14 +92,40 @@ public class RecursiveCteNode extends PlanNode {
                 .add("isUnionAll", isUnionAll).toString();
     }
 
+    @Override
+    public boolean isSerialNode() {
+        // Mirror BE's RecCTESourceOperatorX::is_serial_operator() which 
always returns true:
+        // the recursive driver runs sequentially in one task, so downstream 
consumers must see
+        // RecursiveCteNode as serial too.  Without this, FE planner leaves 
the producer
+        // fragment with parallel=N senders but only one actually emits data — 
the cross-
+        // fragment Exchange receiver expects N senders done and hangs waiting 
on the other
+        // N-1.  Marking serial here lets 
AddLocalExchange#addLocalExchangeForFragment wrap
+        // the root with a PASSTHROUGH LE that fans the serial RecCte output 
out to N
+        // parallel sinks, matching BE-native _plan_local_exchange behaviour.
+        return true;
+    }
+
     @Override
     public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(
             PlanTranslatorContext translatorContext, PlanNode parent, 
LocalExchangeTypeRequire parentRequire) {
+        // Recurse into children to give them a chance to plan local exchanges 
below
+        // themselves, but never insert one *directly* under RecursiveCteNode:
+        //   - ThriftPlansBuilder locates the recursive sender fragment via
+        //     `getChild(1).getChild(0).getFragment()`; a LocalExchangeNode 
wrapper
+        //     would shift that path off the cross-fragment ExchangeNode and 
pull the
+        //     wrong fragment into `fragmentsToReset`.
+        //   - BE's RecCTESourceOperatorX wires the anchor / recursive side 
pipelines
+        //     directly against the Exchange children 
(pipeline_fragment_context.cpp
+        //     REC_CTE_NODE handling); injecting an extra LE pipeline between 
them
+        //     mis-routes the rerun signal and crashes BE during execution.
+        // Both issues are pure shape mismatches — RecursiveCteNode's children 
are
+        // already the cross-fragment ExchangeNode receivers, which BE drives 
serially
+        // itself, so no FE-side fan-out is needed here.
         ArrayList<PlanNode> newChildren = Lists.newArrayList();
         for (int i = 0; i < children.size(); i++) {
             PlanNode child = children.get(i);
-            Pair<PlanNode, LocalExchangeType> childOutput = enforceRequire(
-                    translatorContext, child, i, 
LocalExchangeTypeRequire.noRequire());
+            Pair<PlanNode, LocalExchangeType> childOutput = 
child.enforceAndDeriveLocalExchange(
+                    translatorContext, this, 
LocalExchangeTypeRequire.noRequire());
             newChildren.add(childOutput.first);
         }
         this.children = newChildren;
diff --git 
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_recursive_cte.groovy
 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_recursive_cte.groovy
new file mode 100644
index 00000000000..6b5edeefbd2
--- /dev/null
+++ 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_recursive_cte.groovy
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/**
+ * Regression for DORIS-25865: FE local-shuffle planner used to insert a
+ * LocalExchangeNode directly under RecursiveCteNode, which collided with two
+ * RecursiveCte invariants:
+ *
+ *   1. ThriftPlansBuilder locates the recursive sender fragment via
+ *      `recursiveCteNode.getChild(1).getChild(0).getFragment()`.  An extra LE
+ *      wrapper shifted that path off the cross-fragment ExchangeNode and
+ *      pulled the RecCTE producer fragment itself into `fragmentsToReset`.
+ *      BE then rejected with `[INTERNAL_ERROR]Fragment N contains a recursive
+ *      CTE node` during `RecCTESourceOperatorX::prepare()`.
+ *
+ *   2. BE's `RecCTESourceOperatorX::is_serial_operator()` always returns true,
+ *      but `RecursiveCteNode.isSerialNode()` on the FE side defaulted to
+ *      false.  Without the serial marker, the FE planner left the producer
+ *      fragment with parallel=N sender pipelines while RecCte actually emits
+ *      data from a single instance — the cross-fragment Exchange receiver
+ *      waited forever on the N-1 silent senders and the query hung.
+ *
+ * Fix lives in `RecursiveCteNode`:
+ *   - override `isSerialNode()` to return true (mirrors BE),
+ *   - override `enforceAndDeriveLocalExchange` to bypass the framework's
+ *     `enforceRequire` so no LE is inserted between RecCte and its Exchange
+ *     children (children's own subtrees still get LE planning).
+ *
+ * This test asserts:
+ *   - planner=true succeeds (no "Fragment N contains a recursive CTE node");
+ *   - results between planner=true and planner=false are identical for the
+ *     three downstream-consumer shapes the JIRA listed: aggregate, window,
+ *     grouping sets;
+ *   - the negative control (RecCte directly consumed by SELECT) still works
+ *     in both modes — it would pass even with the original bug, but covers
+ *     the simple path so a regression there is caught immediately.
+ */
+suite("test_local_shuffle_recursive_cte", "nereids_p0") {
+
+    sql "SET enable_nereids_planner=true"
+    sql "SET enable_fallback_to_original_planner=false"
+    sql "SET enable_sql_cache=false"
+    sql "SET enable_local_shuffle=true"
+    sql "SET parallel_pipeline_task_num=4"
+    sql "SET runtime_filter_mode=off"
+
+    // For each SQL, run it twice — once with FE planner, once with BE planner 
—
+    // and assert result rows are identical.  Plan shape intentionally not 
asserted:
+    // the two planners legitimately differ on LE placement.
+    def checkConsistency = { String tag, String testSql ->
+        def sqlOn = """SELECT 
/*+SET_VAR(enable_local_shuffle_planner=true)*/""" + 
testSql.replaceFirst(/(?i)^\s*select/, "")
+        def sqlOff = """SELECT 
/*+SET_VAR(enable_local_shuffle_planner=false)*/""" + 
testSql.replaceFirst(/(?i)^\s*select/, "")
+        check_sql_equal(sqlOn, sqlOff)
+    }
+
+    // ============================================================
+    //  Case 1 — Recursive CTE consumed by aggregate
+    //
+    //  Original error with FE planner:
+    //    errCode = 2, detailMessage = [INTERNAL_ERROR]
+    //    Fragment N contains a recursive CTE node
+    // ============================================================
+    checkConsistency("rec_cte_agg", """
+        SELECT n_mod, count(*) AS c, sum(s) AS total
+        FROM (
+            WITH RECURSIVE cte(n, s) AS (
+                SELECT CAST(1 AS INT), CAST(1 AS BIGINT)
+                UNION ALL
+                SELECT CAST(n + 1 AS INT), CAST(s + n + 1 AS BIGINT)
+                FROM cte WHERE n < 30
+            )
+            SELECT n % 7 AS n_mod, s FROM cte
+        ) t
+        GROUP BY n_mod
+        ORDER BY n_mod
+    """)
+
+    // ============================================================
+    //  Case 2 — Recursive CTE consumed by window function
+    //
+    //  Original failure with FE planner: hung indefinitely because the
+    //  producer fragment had parallel=N sender pipelines but only one of
+    //  them actually emits data.  Fixed by marking RecursiveCteNode serial
+    //  so AddLocalExchange wraps the root with a PASSTHROUGH LE that fans
+    //  the single producer out to N parallel sinks.
+    // ============================================================
+    checkConsistency("rec_cte_window", """
+        SELECT n, sum(n) OVER (PARTITION BY n % 5) AS sum_n
+        FROM (
+            WITH RECURSIVE cte(n) AS (
+                SELECT CAST(1 AS INT)
+                UNION ALL
+                SELECT CAST(n + 1 AS INT) FROM cte WHERE n < 30
+            )
+            SELECT n FROM cte
+        ) t
+        ORDER BY n
+    """)
+
+    // ============================================================
+    //  Case 3 — Recursive CTE consumed by GROUPING SETS
+    //
+    //  Suggested by the JIRA reporter as a third "downstream operator that
+    //  introduces additional fragments / local exchanges" — together with
+    //  aggregate and window it covers the original failure pattern.
+    // ============================================================
+    checkConsistency("rec_cte_grouping_sets", """
+        SELECT n_mod, n_bucket, count(*) AS c, sum(s) AS total
+        FROM (
+            WITH RECURSIVE cte(n, s) AS (
+                SELECT CAST(1 AS INT), CAST(1 AS BIGINT)
+                UNION ALL
+                SELECT CAST(n + 1 AS INT), CAST(s + n + 1 AS BIGINT)
+                FROM cte WHERE n < 30
+            )
+            SELECT n % 7 AS n_mod, n % 3 AS n_bucket, s FROM cte
+        ) t
+        GROUP BY GROUPING SETS ((n_mod), (n_bucket), (n_mod, n_bucket))
+        ORDER BY n_mod NULLS LAST, n_bucket NULLS LAST
+    """)
+
+    // ============================================================
+    //  Negative control — RecCte directly consumed by SELECT.
+    //  This path didn't generate the extra fragments needed to trigger the
+    //  original bug, but exercising it ensures the fix doesn't regress the
+    //  simple consumer shape.
+    // ============================================================
+    checkConsistency("rec_cte_select", """
+        SELECT n
+        FROM (
+            WITH RECURSIVE cte(n) AS (
+                SELECT CAST(1 AS INT)
+                UNION ALL
+                SELECT CAST(n + 1 AS INT) FROM cte WHERE n < 5
+            )
+            SELECT n FROM cte
+        ) t
+        ORDER BY n
+    """)
+
+    // ============================================================
+    //  Case 4 — RecCte feeding a hash JOIN
+    //
+    //  Another downstream consumer that introduces an extra fragment via a
+    //  shuffle join.  Verifies the serial-RecCte → PASSTHROUGH LE wrap also
+    //  works when the consumer requires hash distribution.
+    // ============================================================
+    checkConsistency("rec_cte_join", """
+        SELECT a.n, b.n AS m
+        FROM (
+            WITH RECURSIVE cte(n) AS (
+                SELECT CAST(1 AS INT)
+                UNION ALL
+                SELECT CAST(n + 1 AS INT) FROM cte WHERE n < 10
+            )
+            SELECT n FROM cte
+        ) a JOIN (
+            WITH RECURSIVE cte(n) AS (
+                SELECT CAST(2 AS INT)
+                UNION ALL
+                SELECT CAST(n + 2 AS INT) FROM cte WHERE n < 10
+            )
+            SELECT n FROM cte
+        ) b ON a.n = b.n
+        ORDER BY a.n
+    """)
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to