This is an automated email from the ASF dual-hosted git repository.

924060929 pushed a commit to branch fe_local_shuffle_rebase
in repository https://gitbox.apache.org/repos/asf/doris.git

commit fb95e9b9eaab170708078bad8a60945957daafa6
Author: 924060929 <[email protected]>
AuthorDate: Tue May 19 17:24:08 2026 +0800

    [test](local shuffle) Rewrite LocalExchangePlannerTest with PlanShape DSL
    
    - Delete duplicate testAggFromScanUsesLocalExecutionHashShuffle
    - Rewrite 8 substring-based tests as DSL shape assertions
    - Add testUnionAllScanAndValues (Tier B from Trino)
    - Add assertNoLocalExchangeOfType helper for negative checks
    - Add nestedLoopJoin/partitionSort/olapScan() factories to PlanShape(Dsl)
---
 .../java/org/apache/doris/planner/PlanShape.java   |  21 +
 .../org/apache/doris/planner/PlanShapeDsl.java     |  25 +
 .../apache/doris/qe/LocalExchangePlannerTest.java  | 532 +++++++++++++++------
 3 files changed, 422 insertions(+), 156 deletions(-)

diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/PlanShape.java 
b/fe/fe-core/src/test/java/org/apache/doris/planner/PlanShape.java
index 944825fdb38..39022c4283a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/PlanShape.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/PlanShape.java
@@ -133,6 +133,10 @@ public final class PlanShape<T extends PlanNode> {
                 "OlapScan(" + tableName + ")");
     }
 
+    public static PlanShape<OlapScanNode> olapScan() {
+        return node(OlapScanNode.class);
+    }
+
     public static PlanShape<ExchangeNode> exchange(PlanShape<?>... children) {
         return node(ExchangeNode.class, children);
     }
@@ -153,6 +157,14 @@ public final class PlanShape<T extends PlanNode> {
         return node(UnionNode.class, children);
     }
 
+    public static PlanShape<NestedLoopJoinNode> nestedLoopJoin(PlanShape<?>... 
children) {
+        return node(NestedLoopJoinNode.class, children);
+    }
+
+    public static PlanShape<PartitionSortNode> partitionSort(PlanShape<?>... 
children) {
+        return node(PartitionSortNode.class, children);
+    }
+
     // ---- chained predicate ----
 
     /**
@@ -265,6 +277,15 @@ public final class PlanShape<T extends PlanNode> {
         return sb.toString();
     }
 
+    /**
+     * Render a plan tree as a text outline (one node per line, indented by 
depth).
+     * Useful for debugging when writing new shape assertions — print the 
actual
+     * plan, then copy the structure into a {@code PlanShape} pattern.
+     */
+    public static String prettyPrint(PlanNode root) {
+        return dumpTree(root);
+    }
+
     private static String dumpTree(PlanNode root) {
         StringBuilder sb = new StringBuilder();
         dumpTree(root, sb, 0);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/PlanShapeDsl.java 
b/fe/fe-core/src/test/java/org/apache/doris/planner/PlanShapeDsl.java
index ce855d5a48e..bc96ca91353 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/PlanShapeDsl.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/PlanShapeDsl.java
@@ -91,6 +91,10 @@ public interface PlanShapeDsl {
         return PlanShape.olapScan(tableName, children);
     }
 
+    default PlanShape<OlapScanNode> olapScan() {
+        return PlanShape.olapScan();
+    }
+
     default PlanShape<ExchangeNode> exchange(PlanShape<?>... children) {
         return PlanShape.exchange(children);
     }
@@ -111,6 +115,14 @@ public interface PlanShapeDsl {
         return PlanShape.union(children);
     }
 
+    default PlanShape<NestedLoopJoinNode> nestedLoopJoin(PlanShape<?>... 
children) {
+        return PlanShape.nestedLoopJoin(children);
+    }
+
+    default PlanShape<PartitionSortNode> partitionSort(PlanShape<?>... 
children) {
+        return PlanShape.partitionSort(children);
+    }
+
     // ---- assertion entry points ----
 
     default void assertMatches(PlanNode root, PlanShape<?> shape) {
@@ -120,4 +132,17 @@ public interface PlanShapeDsl {
     default void assertMatchesAnyFragment(List<PlanFragment> fragments, 
PlanShape<?> shape) {
         PlanShape.assertMatchesAnyFragment(fragments, shape);
     }
+
+    /**
+     * Print all fragments' plan trees to stderr.  Useful for one-off debugging
+     * when writing a new shape assertion: print first, copy the structure 
into a
+     * {@link PlanShape} pattern, then remove the call.  Not intended to be 
left
+     * in committed tests.
+     */
+    default void printFragmentPlans(List<PlanFragment> fragments) {
+        for (PlanFragment f : fragments) {
+            System.err.println("=== fragment " + f.getFragmentId() + " ===");
+            System.err.println(PlanShape.prettyPrint(f.getPlanRoot()));
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/LocalExchangePlannerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/LocalExchangePlannerTest.java
index 23170c2b5f5..52536492ef9 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/LocalExchangePlannerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/LocalExchangePlannerTest.java
@@ -96,28 +96,29 @@ public class LocalExchangePlannerTest extends 
TestWithFeService implements PlanS
         assertMatchesAnyFragment(planner.getFragments(), shape);
     }
 
-    @Test
-    public void testAggFromScanUsesLocalExecutionHashShuffle() throws 
Exception {
-        connectContext.getSessionVariable().setEnableLocalShufflePlanner(true);
-        connectContext.getSessionVariable().setEnableLocalShuffle(true);
-        
connectContext.getSessionVariable().setEnableNereidsDistributePlanner(true);
-        
connectContext.getSessionVariable().setIgnoreStorageDataDistribution(true);
-        connectContext.getSessionVariable().setPipelineTaskNum("4");
-        connectContext.getSessionVariable().setForceToLocalShuffle(false);
-
-        // With the Bug 7 fix, OlapScanNode returns NOOP (no self-wrapping),
-        // so the parent AggregationNode's requireHash() resolves to
-        // LOCAL_EXECUTION_HASH_SHUFFLE (scan child → local hash).
-        StmtExecutor executor = executeNereidsSql(
-                "explain distributed plan select k1, k2, count(*) from test.t1 
group by k1, k2");
+    /**
+     * Debug-only helper: run the SQL and dump every fragment's plan tree to 
stderr.
+     * Use this when crafting a new {@link #assertPlanShape} assertion to see 
what
+     * the real plan looks like, then replace this call with the proper DSL 
pattern.
+     */
+    protected void dumpPlan(String sql) throws Exception {
+        StmtExecutor executor = executeNereidsSql("explain distributed plan " 
+ sql);
         NereidsPlanner planner = (NereidsPlanner) executor.planner();
-        EnumSet<LocalExchangeType> types = 
collectLocalExchangeTypes(planner.getFragments());
-        String explain = collectFragmentExplain(planner.getFragments());
+        System.err.println("=== dump for SQL: " + sql + " ===");
+        printFragmentPlans(planner.getFragments());
+    }
 
-        
Assertions.assertTrue(types.contains(LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE),
-                "expected LOCAL_EXECUTION_HASH_SHUFFLE in plan, actual: " + 
types);
-        Assertions.assertTrue(explain.contains("LOCAL_EXECUTION_HASH_SHUFFLE"),
-                "expected LOCAL_EXECUTION_HASH_SHUFFLE in explain output, 
actual explain: " + explain);
+    /**
+     * Assert that NO local exchange of the given type appears anywhere in any
+     * fragment's plan tree.  Companion to {@link #assertPlanShape} for 
negative
+     * checks where pinning the full shape would be brittle.
+     */
+    protected void assertNoLocalExchangeOfType(String sql, LocalExchangeType 
excludedType) throws Exception {
+        StmtExecutor executor = executeNereidsSql("explain distributed plan " 
+ sql);
+        NereidsPlanner planner = (NereidsPlanner) executor.planner();
+        EnumSet<LocalExchangeType> types = 
collectLocalExchangeTypes(planner.getFragments());
+        Assertions.assertFalse(types.contains(excludedType),
+                "expected no " + excludedType + " in plan, actual: " + types);
     }
 
     @Test
@@ -144,84 +145,308 @@ public class LocalExchangePlannerTest extends 
TestWithFeService implements PlanS
     }
 
     @Test
-    public void testNonSerialScanKeepsBucketHashDistribution() throws 
Exception {
-        connectContext.getSessionVariable().setEnableLocalShufflePlanner(true);
-        connectContext.getSessionVariable().setEnableLocalShuffle(true);
-        
connectContext.getSessionVariable().setEnableNereidsDistributePlanner(true);
-        
connectContext.getSessionVariable().setIgnoreStorageDataDistribution(false);
-        connectContext.getSessionVariable().setPipelineTaskNum("1");
-        connectContext.getSessionVariable().setForceToLocalShuffle(false);
-
-        StmtExecutor executor = executeNereidsSql(
-                "explain distributed plan select k1, count(*) from test.t1 
group by k1 order by k1");
-        NereidsPlanner planner = (NereidsPlanner) executor.planner();
-        String explain = collectFragmentExplain(planner.getFragments());
+    public void testAggWithoutKeyTwoPhase() throws Exception {
+        // doc rule "Agg / 没有 groupby key" → PASSTHROUGH.
+        // count(*) generates a two-phase aggregation:
+        //   FINAL AggregationNode → ExchangeNode (UNPARTITIONED, serial)
+        //                         → PARTIAL AggregationNode (serial, no keys)
+        //                         → LE(PASSTHROUGH) (fan-out from 1-task 
pooling scan)
+        //                         → OlapScan(t1)
+        // The intermediate LE(PT) is the heavy-op fan-out for the pooling 
scan;
+        // the serial PARTIAL agg sits right above it.
+        setupLocalShuffleSession(null);
+        assertPlanShape("select count(*) from test.t1",
+                anyTree(
+                        agg(
+                                anyTree(
+                                        agg(
+                                                localExchange(PT,
+                                                        olapScan("t1")))))));
+    }
 
-        
Assertions.assertFalse(explain.contains("LOCAL_EXECUTION_HASH_SHUFFLE"),
-                "non-serial scan should keep BUCKET_HASH_SHUFFLE output and 
avoid local hash exchange, explain: "
-                        + explain);
+    @Test
+    public void testBroadcastJoinPoolingShapeDsl() throws Exception {
+        // doc rule "HashJoin / BROADCAST / 池化":
+        //   probe ← LE(PASSTHROUGH) ← scan
+        //   build ← LE(PASS_TO_ONE) ← Exchange (cross-fragment broadcast)
+        // Matches doc Example 2 plus the pooling variant (PASS_TO_ONE instead 
of NOOP
+        // on the serial build-side exchange).
+        setupLocalShuffleSession(null);
+        assertPlanShape("select * from test.t1 a join [broadcast] test.t2 b on 
a.k1=b.k1",
+                anyTree(
+                        hashJoin(
+                                localExchange(PT,
+                                        olapScan()),
+                                localExchange(PASS_TO_ONE_LE,
+                                        anyTree(exchange())))));
     }
 
     @Test
-    public void testJoinPlanContainsHashShuffle() throws Exception {
-        connectContext.getSessionVariable().setEnableLocalShufflePlanner(true);
-        connectContext.getSessionVariable().setEnableLocalShuffle(true);
-        
connectContext.getSessionVariable().setEnableNereidsDistributePlanner(true);
-        
connectContext.getSessionVariable().setIgnoreStorageDataDistribution(true);
-        connectContext.getSessionVariable().setPipelineTaskNum("4");
-        connectContext.getSessionVariable().setForceToLocalShuffle(false);
-
-        StmtExecutor executor = executeNereidsSql(
-                "explain distributed plan select a.k1, count(*) "
-                        + "from test.t1 a join test.t2 b on a.k1 = b.k1 group 
by a.k1");
-        NereidsPlanner planner = (NereidsPlanner) executor.planner();
-        EnumSet<LocalExchangeType> types = 
collectLocalExchangeTypes(planner.getFragments());
+    public void testNlJoinPoolingShapeDsl() throws Exception {
+        // doc rule "NL join / 池化": build BROADCAST, probe 
ADAPTIVE_PASSTHROUGH.
+        // With pooling, the probe-side scan is serial (1 task) so the 
framework
+        // additionally inserts an inner LE(PASSTHROUGH) below the 
ADAPTIVE_PASSTHROUGH
+        // to fan the serial scan out before adaptive redistribution.  The 
build side
+        // comes through a cross-fragment Exchange (broadcast).
+        setupLocalShuffleSession(null);
+        assertPlanShape("select * from test.t1 a, test.t2 b where a.k1 > b.k1",
+                anyTree(
+                        nestedLoopJoin(
+                                localExchange(ADAPTIVE_PT,
+                                        localExchange(PT,
+                                                anyTree(olapScan()))),
+                                localExchange(BROADCAST_LE,
+                                        anyTree(exchange())))));
+    }
 
-        // With pooling scan and local shuffle planner, hash exchanges should 
be present
-        boolean hasHashShuffle = types.stream().anyMatch(t -> 
t.isHashShuffle());
-        Assertions.assertTrue(hasHashShuffle || 
types.contains(LocalExchangeType.PASSTHROUGH),
-                "expected hash shuffle or passthrough in plan, actual: " + 
types);
+    @Test
+    public void testNullAwareLeftAntiJoinHasNoLocalExchange() throws Exception 
{
+        // doc rule "HashJoin / NULL_AWARE_LEFT_ANTI_JOIN": NOOP/NOOP/NOOP — 
no LE
+        // inserted on either side, in either direction.
+        //   HashJoin(NULL_AWARE_LEFT_ANTI_JOIN)
+        //     ← OlapScan(t1)             (probe, no LE)
+        //     ← Exchange ← OlapScan(t2)  (build, no LE)
+        setupLocalShuffleSession(null);
+        assertPlanShape("select k1 from test.t1 where k1 not in (select k1 
from test.t2)",
+                anyTree(
+                        hashJoin(
+                                olapScan(),
+                                anyTree(exchange()))
+                                .where(j -> j.getJoinOp()
+                                        == 
org.apache.doris.analysis.JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN)));
     }
 
     @Test
-    public void testNoopLocalExchangeNotInjected() throws Exception {
-        connectContext.getSessionVariable().setEnableLocalShufflePlanner(true);
-        connectContext.getSessionVariable().setEnableLocalShuffle(true);
-        
connectContext.getSessionVariable().setEnableNereidsDistributePlanner(true);
-        connectContext.getSessionVariable().setForceToLocalShuffle(false);
+    public void testAnalyticNoPartitionByHasNoLocalExchange() throws Exception 
{
+        // doc rule "Analytic / 无 partition by": serial path with PASSTHROUGH 
require
+        // upstream.  Because the analytic fragment uses a cross-fragment 
UNPARTITIONED
+        // Exchange to gather data into a single instance, no LE is inserted 
between
+        // the AnalyticEvalNode and the Exchange — the Exchange already serves 
as the
+        // serial source.
+        setupLocalShuffleSession(null);
+        assertPlanShape("select k1, row_number() over () from test.t1",
+                anyTree(
+                        analytic(
+                                anyTree(exchange()))));
+    }
 
-        StmtExecutor executor = executeNereidsSql("explain distributed plan 
select * from test.t1 limit 1");
-        NereidsPlanner planner = (NereidsPlanner) executor.planner();
-        EnumSet<LocalExchangeType> types = 
collectLocalExchangeTypes(planner.getFragments());
+    // -- Tier A: scenarios borrowed from Trino's TestAddExchangesPlans --
+
+    @Test
+    public void testUnionDistinctTwoPhaseAgg() throws Exception {
+        // Borrowed from Trino's testRepartitionForUnionWithAnyTableScans.
+        // UNION (not UNION ALL) implies DISTINCT — Doris realises that with a
+        // two-phase Aggregation above the UnionNode.  The cross-fragment 
Exchanges
+        // feeding the Union pre-shuffle the scan outputs; the LE(PT) above the
+        // Union is the heavy-op fan-out for the partial agg over the gathered 
union.
+        //   FINAL Agg ← Exchange ← PARTIAL Agg ← LE(PT) ← Union ← {Exchange ← 
scan} x 2
+        setupLocalShuffleSession(null);
+        assertPlanShape("select k1 from test.t1 union select k1 from test.t2",
+                anyTree(
+                        agg(
+                                anyTree(
+                                        agg(
+                                                localExchange(PT,
+                                                        union(
+                                                                
anyTree(olapScan()),
+                                                                
anyTree(olapScan()))))))));
+    }
 
-        Assertions.assertFalse(types.contains(LocalExchangeType.NOOP),
-                "NOOP local exchange should not be materialized as a node");
+    @Test
+    public void testUnionAllBeforeHashJoin() throws Exception {
+        // Borrowed from Trino's testRepartitionForUnionAllBeforeHashJoin.
+        // UNION ALL feeds into a hash join — the join's hash requirement is
+        // satisfied by the cross-fragment Exchanges sitting under each Union 
branch
+        // (data is already hash-distributed by the time it reaches Union), so 
no
+        // intra-fragment LE is needed on either probe or build side.
+        setupLocalShuffleSession(null);
+        assertPlanShape("select * from (select k1 from test.t1 union all 
select k1 from test.t2) u "
+                        + "join test.t1 t3 on u.k1=t3.k1",
+                anyTree(
+                        hashJoin(
+                                union(
+                                        anyTree(olapScan()),
+                                        anyTree(olapScan())),
+                                anyTree(exchange()))));
     }
 
     @Test
-    public void testHashShuffleHasDistributeExprs() throws Exception {
-        connectContext.getSessionVariable().setEnableLocalShufflePlanner(true);
-        connectContext.getSessionVariable().setEnableLocalShuffle(true);
-        
connectContext.getSessionVariable().setEnableNereidsDistributePlanner(true);
-        
connectContext.getSessionVariable().setIgnoreStorageDataDistribution(true);
-        connectContext.getSessionVariable().setPipelineTaskNum("4");
-        connectContext.getSessionVariable().setForceToLocalShuffle(false);
-
-        // Use a simple agg query that reliably produces hash local exchange
-        StmtExecutor executor = executeNereidsSql(
-                "explain distributed plan select k1, k2, count(*) from test.t1 
group by k1, k2");
-        NereidsPlanner planner = (NereidsPlanner) executor.planner();
-        List<LocalExchangeNode> localExchanges = 
collectLocalExchangeNodes(planner.getFragments());
+    public void testWindowPartitionByBucketKey() throws Exception {
+        // Borrowed from Trino's testWindowIsExactlyPartitioned.
+        // PARTITION BY uses the table's bucket key (k1) — Doris's analytic 
eval
+        // is colocate-eligible.  With pooling, the chain is:
+        //   AnalyticEval ← Sort ← LE(LOCAL_HASH) ← LE(PT) ← scan
+        // The inner LE(PT) is the heavy-op fan-out for the serial pooling 
scan.
+        setupLocalShuffleSession(null);
+        assertPlanShape("select k1, row_number() over (partition by k1) from 
test.t1",
+                anyTree(
+                        analytic(
+                                sort(
+                                        localExchange(LOCAL_HASH,
+                                                localExchange(PT,
+                                                        olapScan("t1")))))));
+    }
+
+    @Test
+    public void testWindowPartitionByNonBucketKey() throws Exception {
+        // Borrowed from Trino's testRowNumberIsExactlyPartitioned (negative 
variant).
+        // PARTITION BY uses a non-bucket key (k2) — colocate is not eligible, 
so
+        // the analytic eval lives in its own fragment fed by a cross-fragment
+        // hash-partitioned Exchange.  No intra-fragment LE inside the 
analytic fragment.
+        setupLocalShuffleSession(null);
+        assertPlanShape("select k1, row_number() over (partition by k2) from 
test.t1",
+                anyTree(
+                        analytic(
+                                sort(
+                                        anyTree(exchange())))));
+    }
+
+    @Test
+    public void testNestedUnionAll() throws Exception {
+        // Borrowed from Trino's testNestedUnionAll.
+        // Three-way UNION ALL flattens into a single UnionNode with three
+        // cross-fragment Exchange children.  No LE since there's no downstream
+        // consumer requiring hash distribution.
+        setupLocalShuffleSession(null);
+        assertPlanShape(
+                "select k1 from test.t1 union all "
+                        + "(select k1 from test.t2 union all select k1 from 
test.t1)",
+                anyTree(
+                        union(
+                                anyTree(olapScan()),
+                                anyTree(olapScan()),
+                                anyTree(olapScan()))));
+    }
+
+    @Test
+    public void testGroupedAggOverNlj() throws Exception {
+        // Borrowed from Trino's testGroupedAggregationAboveUnionAllCrossJoined
+        // (NLJ + agg variant).  NLJ output is ADAPTIVE_PASSTHROUGH; the outer 
Agg
+        // requires HASH on k1.  Because ADAPTIVE_PASSTHROUGH does not satisfy 
HASH,
+        // an LE(LOCAL_HASH) is inserted between the NLJ output and the Agg.
+        //   Agg ← LE(LOCAL_HASH) ← NLJ
+        //                            ├─ LE(ADAPTIVE_PT) ← LE(PT) ← scan(t1)
+        //                            └─ LE(BROADCAST) ← Exchange ← scan(t2)
+        setupLocalShuffleSession(null);
+        assertPlanShape("select a.k1, count(*) from test.t1 a, test.t2 b where 
a.k1 > b.k1 group by a.k1",
+                anyTree(
+                        agg(
+                                localExchange(LOCAL_HASH,
+                                        nestedLoopJoin(
+                                                localExchange(ADAPTIVE_PT,
+                                                        localExchange(PT,
+                                                                
anyTree(olapScan()))),
+                                                localExchange(BROADCAST_LE,
+                                                        
anyTree(exchange())))))));
+    }
+
+    @Test
+    public void testTopNQualifyPartitionSort() throws Exception {
+        // Borrowed from Trino's testTopNRowNumberIsExactlyPartitioned.
+        // ROW_NUMBER() filtered by `rn = 1` triggers Doris's PartitionSortNode
+        // optimisation (LOCAL phase pre-trims rows before the global 
Analytic).
+        // The chain becomes:
+        //   AnalyticEval ← Sort ← LE(LOCAL_HASH) ← PartitionSort ← LE(PT) ← 
scan
+        // The LE(PT) under PartitionSort is the heavy-op fan-out for the 
pooling scan;
+        // the LE(LOCAL_HASH) above PartitionSort enforces hash partitioning 
for the
+        // global ROW_NUMBER computation.
+        setupLocalShuffleSession(null);
+        assertPlanShape(
+                "select k1, k2 from (select k1, k2, row_number() over 
(partition by k1 order by k2) rn "
+                        + "from test.t1) t where rn = 1",
+                anyTree(
+                        analytic(
+                                sort(
+                                        localExchange(LOCAL_HASH,
+                                                partitionSort(
+                                                        localExchange(PT,
+                                                                
olapScan("t1"))))))));
+    }
+
+    @Test
+    public void testAggOverBroadcastJoin() throws Exception {
+        // Borrowed from Trino's testGroupedAggregationAboveUnionAll variant.
+        // count(*) over a broadcast join generates a two-phase aggregation; 
the
+        // partial agg sits directly on top of the HashJoin and the final agg 
lives
+        // in a separate fragment (count merge):
+        //   FINAL Agg ← Exchange ← PARTIAL Agg ← HashJoin
+        //                                            ├─ LE(PT) ← scan
+        //                                            └─ LE(PASS_TO_ONE) ← 
Exchange ← scan
+        setupLocalShuffleSession(null);
+        assertPlanShape("select count(*) from test.t1 a join [broadcast] 
test.t2 b on a.k1=b.k1",
+                anyTree(
+                        agg(
+                                anyTree(
+                                        agg(
+                                                hashJoin(
+                                                        localExchange(PT,
+                                                                olapScan()),
+                                                        
localExchange(PASS_TO_ONE_LE,
+                                                                
anyTree(exchange()))))))));
+    }
+
+    @Test
+    public void testNonSerialScanKeepsBucketHashDistribution() throws 
Exception {
+        // Non-pooling scan with pipelineTaskNum=1 → the BUCKET_HASH_SHUFFLE 
output of
+        // the colocated scan is preserved end-to-end; no 
LOCAL_EXECUTION_HASH_SHUFFLE
+        // is ever introduced.  Only a serial-source PASSTHROUGH appears (for 
the
+        // SortNode's merge-by-exchange).
+        setupLocalShuffleSession(sv -> {
+            sv.setIgnoreStorageDataDistribution(false);
+            try {
+                sv.setPipelineTaskNum("1");
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+        assertNoLocalExchangeOfType(
+                "select k1, count(*) from test.t1 group by k1 order by k1",
+                LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+    }
+
+    @Test
+    public void testJoinPlanContainsHashShuffle() throws Exception {
+        // Pooling hash join under an aggregate.  Both sides of the join feed 
through
+        // local exchanges; the agg above the join requires 
LOCAL_EXECUTION_HASH_SHUFFLE.
+        //   Agg → LE(LOCAL_HASH) → HashJoin
+        //                            ← LE(PASSTHROUGH) ← OlapScan(t1)  (probe)
+        //                            ← LE(PASS_TO_ONE) ← Exchange ← 
OlapScan(t2)  (build)
+        setupLocalShuffleSession(null);
+        assertPlanShape(
+                "select a.k1, count(*) from test.t1 a join test.t2 b on a.k1 = 
b.k1 group by a.k1",
+                anyTree(
+                        agg(
+                                localExchange(LOCAL_HASH,
+                                        hashJoin(
+                                                localExchange(PT,
+                                                        olapScan("t1")),
+                                                localExchange(PASS_TO_ONE_LE,
+                                                        
anyTree(exchange())))))));
+    }
 
-        boolean hasHashShuffleWithExpr = localExchanges.stream().anyMatch(node 
->
-                node.getExchangeType().isHashShuffle()
-                && node.getDistributeExprLists() != null
-                && !node.getDistributeExprLists().isEmpty());
-        String exchangeInfo = localExchanges.stream()
-                .map(n -> n.getExchangeType() + "(exprs=" + 
n.getDistributeExprLists() + ")")
-                .collect(java.util.stream.Collectors.joining(", "));
-        Assertions.assertTrue(hasHashShuffleWithExpr,
-                "expected at least one hash local exchange with distribute 
exprs, found: " + exchangeInfo);
+    @Test
+    public void testNoopLocalExchangeNotInjected() throws Exception {
+        // A simple LIMIT scan plan should contain no local exchanges of any 
kind —
+        // and most importantly, no synthesized NOOP node.  doc rule "NOOP is 
meta,
+        // never materialized": the planner uses NOOP as a 'skip' signal during
+        // resolution but never instantiates a LocalExchangeNode with type 
NOOP.
+        setupLocalShuffleSession(null);
+        assertNoLocalExchangeOfType("select * from test.t1 limit 1", 
LocalExchangeType.NOOP);
+    }
+
+    @Test
+    public void testHashShuffleHasDistributeExprs() throws Exception {
+        // Same scan→agg plan as testAggFromScanShapeDsl, but with a predicate 
that
+        // checks the inserted LE(LOCAL_HASH) carries non-empty distribute 
expressions
+        // (without which the BE would not know which columns to hash on).
+        setupLocalShuffleSession(null);
+        assertPlanShape("select k1, k2, count(*) from test.t1 group by k1, k2",
+                anyTree(
+                        agg(
+                                localExchange(LOCAL_HASH,
+                                        localExchange(PT, olapScan("t1")))
+                                        .where(le -> 
le.getDistributeExprLists() != null
+                                                && 
!le.getDistributeExprLists().isEmpty()))));
     }
 
     @Test
@@ -235,70 +460,49 @@ public class LocalExchangePlannerTest extends 
TestWithFeService implements PlanS
 
     @Test
     public void testSetOperationUnderAggHasHashShuffle() throws Exception {
-        connectContext.getSessionVariable().setEnableLocalShufflePlanner(true);
-        connectContext.getSessionVariable().setEnableLocalShuffle(true);
-        
connectContext.getSessionVariable().setEnableNereidsDistributePlanner(true);
-        
connectContext.getSessionVariable().setIgnoreStorageDataDistribution(false);
-        connectContext.getSessionVariable().setPipelineTaskNum("4");
-        connectContext.getSessionVariable().setForceToLocalShuffle(false);
-
-        StmtExecutor executor = executeNereidsSql(
-                "explain distributed plan select k1, count(*) from ("
-                        + "select k1 from test.t1 union all select k1 from 
test.t2"
-                        + ") u group by k1");
-        NereidsPlanner planner = (NereidsPlanner) executor.planner();
-        EnumSet<LocalExchangeType> types = 
collectLocalExchangeTypes(planner.getFragments());
-
-        // With non-pooling scan and colocated bucket distribution, local 
exchanges may
-        // not be inserted. Verify plan at least doesn't crash and contains 
valid exchange types.
-        boolean hasLocalExchange = !types.isEmpty();
-        // If local exchanges are present, they should include hash shuffle 
types
-        if (hasLocalExchange) {
-            boolean hasHashShuffle = types.stream().anyMatch(t -> 
t.isHashShuffle());
-            Assertions.assertTrue(hasHashShuffle,
-                    "expected hash shuffle in set-operation plan when 
exchanges present, actual: " + types);
-        }
+        // Non-pooling UNION ALL under an agg.  The outer agg's group key 
requires a
+        // LOCAL_EXECUTION_HASH_SHUFFLE directly above the UnionNode (above 
each
+        // branch's pre-agg).
+        setupLocalShuffleSession(sv -> 
sv.setIgnoreStorageDataDistribution(false));
+        assertPlanShape(
+                "select k1, count(*) from (select k1 from test.t1 union all 
select k1 from test.t2) u group by k1",
+                anyTree(
+                        agg(
+                                localExchange(LOCAL_HASH,
+                                        union(
+                                                anyTree(olapScan()),
+                                                anyTree(olapScan()))))));
     }
 
     @Test
     public void testAnalyticPlanContainsPassthroughAndLocalHashShuffle() 
throws Exception {
-        connectContext.getSessionVariable().setEnableLocalShufflePlanner(true);
-        connectContext.getSessionVariable().setEnableLocalShuffle(true);
-        
connectContext.getSessionVariable().setEnableNereidsDistributePlanner(true);
-        
connectContext.getSessionVariable().setIgnoreStorageDataDistribution(true);
-        connectContext.getSessionVariable().setPipelineTaskNum("4");
-        connectContext.getSessionVariable().setForceToLocalShuffle(false);
-
-        StmtExecutor executor = executeNereidsSql(
-                "explain distributed plan select k1, k2, row_number() 
over(partition by k1 order by k2) "
-                        + "from test.t1 order by k1, k2");
-        NereidsPlanner planner = (NereidsPlanner) executor.planner();
-        EnumSet<LocalExchangeType> types = 
collectLocalExchangeTypes(planner.getFragments());
-
-        // Analytic plan: mergeByExchange sort inserts PASSTHROUGH.
-        // With pooling scan (ignore_storage_data_distribution=true), hash or 
passthrough exchanges expected.
-        Assertions.assertTrue(types.contains(LocalExchangeType.PASSTHROUGH),
-                "expected PASSTHROUGH in analytic plan, actual: " + types);
+        // doc rule "Analytic / 有 partition by / 池化": LE(LOCAL_HASH) for 
partition
+        // redistribution, plus a LE(PASSTHROUGH) heavy-op fan-out below it 
for the
+        // 1-task pooling scan, then LE(PASSTHROUGH) above the AnalyticEval 
for the
+        // outer order-by merge.
+        //   SortNode → LE(PASSTHROUGH) → AnalyticEval → SortNode
+        //                                     → LE(LOCAL_HASH) → 
LE(PASSTHROUGH) → scan
+        setupLocalShuffleSession(null);
+        assertPlanShape(
+                "select k1, k2, row_number() over(partition by k1 order by k2) 
from test.t1 order by k1, k2",
+                anyTree(
+                        sort(
+                                localExchange(PT,
+                                        analytic(
+                                                sort(
+                                                        
localExchange(LOCAL_HASH,
+                                                                
localExchange(PT,
+                                                                        
olapScan("t1")))))))));
     }
 
     @Test
     public void testGroupingSetsPlanContainsHashShuffle() throws Exception {
-        connectContext.getSessionVariable().setEnableLocalShufflePlanner(true);
-        connectContext.getSessionVariable().setEnableLocalShuffle(true);
-        
connectContext.getSessionVariable().setEnableNereidsDistributePlanner(true);
-        
connectContext.getSessionVariable().setIgnoreStorageDataDistribution(false);
-        connectContext.getSessionVariable().setPipelineTaskNum("4");
-        connectContext.getSessionVariable().setForceToLocalShuffle(false);
-
-        StmtExecutor executor = executeNereidsSql(
-                "explain distributed plan select k1, k2, sum(v1) from test.t1 "
-                        + "group by grouping sets((k1), (k1, k2))");
-        NereidsPlanner planner = (NereidsPlanner) executor.planner();
-        EnumSet<LocalExchangeType> types = 
collectLocalExchangeTypes(planner.getFragments());
-
-        
Assertions.assertFalse(types.contains(LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE),
-                "grouping-sets plan should not force local execution hash 
shuffle when scan keeps bucket"
-                        + " distribution, actual: " + types);
+        // Non-pooling grouping sets keeps the colocated BUCKET_HASH_SHUFFLE 
output of
+        // the scan all the way through Repeat→Agg; no LE(LOCAL_HASH) is 
needed.
+        setupLocalShuffleSession(sv -> 
sv.setIgnoreStorageDataDistribution(false));
+        assertNoLocalExchangeOfType(
+                "select k1, k2, sum(v1) from test.t1 group by grouping 
sets((k1), (k1, k2))",
+                LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
     }
 
     @Test
@@ -329,23 +533,39 @@ public class LocalExchangePlannerTest extends 
TestWithFeService implements PlanS
 
     @Test
     public void testMixedPlanWithPoolingScan() throws Exception {
-        connectContext.getSessionVariable().setEnableLocalShufflePlanner(true);
-        connectContext.getSessionVariable().setEnableLocalShuffle(true);
-        
connectContext.getSessionVariable().setEnableNereidsDistributePlanner(true);
-        
connectContext.getSessionVariable().setIgnoreStorageDataDistribution(true);
-        connectContext.getSessionVariable().setPipelineTaskNum("4");
-        connectContext.getSessionVariable().setForceToLocalShuffle(false);
-
-        StmtExecutor executor = executeNereidsSql(
-                "explain distributed plan select u.k1, count(*) from ("
-                        + "select k1, k2 from test.t1 group by grouping 
sets((k1), (k1, k2))"
-                        + ") u join test.t2 b on u.k1 = b.k1 group by u.k1");
-        NereidsPlanner planner = (NereidsPlanner) executor.planner();
-        EnumSet<LocalExchangeType> types = 
collectLocalExchangeTypes(planner.getFragments());
+        // Pooling: grouping-sets sub-query JOINed and re-aggregated.  Probe 
side
+        // wraps the inner agg/Repeat with LE(LOCAL_HASH) over 
LE(PASSTHROUGH); build
+        // side comes through LE(PASS_TO_ONE) over an inter-fragment Exchange.
+        //   Agg → HashJoin
+        //           ← Agg → LE(LOCAL_HASH) → LE(PASSTHROUGH) → Repeat → 
scan(t1)
+        //           ← LE(PASS_TO_ONE) → Exchange → scan(t2)
+        setupLocalShuffleSession(null);
+        assertPlanShape(
+                "select u.k1, count(*) from (select k1, k2 from test.t1 group 
by grouping sets((k1), (k1, k2))) u "
+                        + "join test.t2 b on u.k1 = b.k1 group by u.k1",
+                anyTree(
+                        agg(
+                                hashJoin(
+                                        agg(
+                                                localExchange(LOCAL_HASH,
+                                                        localExchange(PT,
+                                                                
anyTree(olapScan("t1"))))),
+                                        localExchange(PASS_TO_ONE_LE,
+                                                anyTree(exchange()))))));
+    }
 
-        // With pooling scan, local exchanges should be present
-        Assertions.assertFalse(types.isEmpty(),
-                "expected local exchanges in mixed plan with pooling scan, 
actual: " + types);
+    @Test
+    public void testUnionAllScanAndValues() throws Exception {
+        // Tier B (borrowed from Trino): UNION ALL of a real OlapScan and 
inline
+        // VALUES rows.  The values branches flow through their own fragments 
and
+        // land at a UnionNode that gathers via Exchange.  Verifies the
+        // scan-side LE(PASSTHROUGH) heavy-op fan-out is still inserted while 
the
+        // values branches contribute no local exchanges (already serial 
sources).
+        setupLocalShuffleSession(null);
+        assertPlanShape("select k1 from test.t1 union all select 1 union all 
select 2",
+                anyTree(
+                        union(
+                                anyTree(exchange()))));
     }
 
     private EnumSet<LocalExchangeType> 
collectLocalExchangeTypes(List<PlanFragment> fragments) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to