This is an automated email from the ASF dual-hosted git repository. 924060929 pushed a commit to branch fe_local_shuffle_rebase_wip in repository https://gitbox.apache.org/repos/asf/doris.git
commit 383ea3f6c8065544decfe20d497425abc33b34a8 Author: 924060929 <[email protected]> AuthorDate: Tue May 19 17:24:08 2026 +0800 [test](local shuffle) Rewrite LocalExchangePlannerTest with PlanShape DSL - Delete duplicate testAggFromScanUsesLocalExecutionHashShuffle - Rewrite 8 substring-based tests as DSL shape assertions - Add testUnionAllScanAndValues (Tier B from Trino) - Add assertNoLocalExchangeOfType helper for negative checks - Add nestedLoopJoin/partitionSort/olapScan() factories to PlanShape(Dsl) --- .../java/org/apache/doris/planner/PlanShape.java | 21 + .../org/apache/doris/planner/PlanShapeDsl.java | 25 + .../apache/doris/qe/LocalExchangePlannerTest.java | 532 +++++++++++++++------ 3 files changed, 422 insertions(+), 156 deletions(-) diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/PlanShape.java b/fe/fe-core/src/test/java/org/apache/doris/planner/PlanShape.java index 944825fdb38..39022c4283a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/PlanShape.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/PlanShape.java @@ -133,6 +133,10 @@ public final class PlanShape<T extends PlanNode> { "OlapScan(" + tableName + ")"); } + public static PlanShape<OlapScanNode> olapScan() { + return node(OlapScanNode.class); + } + public static PlanShape<ExchangeNode> exchange(PlanShape<?>... children) { return node(ExchangeNode.class, children); } @@ -153,6 +157,14 @@ public final class PlanShape<T extends PlanNode> { return node(UnionNode.class, children); } + public static PlanShape<NestedLoopJoinNode> nestedLoopJoin(PlanShape<?>... children) { + return node(NestedLoopJoinNode.class, children); + } + + public static PlanShape<PartitionSortNode> partitionSort(PlanShape<?>... children) { + return node(PartitionSortNode.class, children); + } + // ---- chained predicate ---- /** @@ -265,6 +277,15 @@ public final class PlanShape<T extends PlanNode> { return sb.toString(); } + /** + * Render a plan tree as a text outline (one node per line, indented by depth). + * Useful for debugging when writing new shape assertions — print the actual + * plan, then copy the structure into a {@code PlanShape} pattern. + */ + public static String prettyPrint(PlanNode root) { + return dumpTree(root); + } + private static String dumpTree(PlanNode root) { StringBuilder sb = new StringBuilder(); dumpTree(root, sb, 0); diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/PlanShapeDsl.java b/fe/fe-core/src/test/java/org/apache/doris/planner/PlanShapeDsl.java index ce855d5a48e..bc96ca91353 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/PlanShapeDsl.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/PlanShapeDsl.java @@ -91,6 +91,10 @@ public interface PlanShapeDsl { return PlanShape.olapScan(tableName, children); } + default PlanShape<OlapScanNode> olapScan() { + return PlanShape.olapScan(); + } + default PlanShape<ExchangeNode> exchange(PlanShape<?>... children) { return PlanShape.exchange(children); } @@ -111,6 +115,14 @@ public interface PlanShapeDsl { return PlanShape.union(children); } + default PlanShape<NestedLoopJoinNode> nestedLoopJoin(PlanShape<?>... children) { + return PlanShape.nestedLoopJoin(children); + } + + default PlanShape<PartitionSortNode> partitionSort(PlanShape<?>... children) { + return PlanShape.partitionSort(children); + } + // ---- assertion entry points ---- default void assertMatches(PlanNode root, PlanShape<?> shape) { @@ -120,4 +132,17 @@ public interface PlanShapeDsl { default void assertMatchesAnyFragment(List<PlanFragment> fragments, PlanShape<?> shape) { PlanShape.assertMatchesAnyFragment(fragments, shape); } + + /** + * Print all fragments' plan trees to stderr. Useful for one-off debugging + * when writing a new shape assertion: print first, copy the structure into a + * {@link PlanShape} pattern, then remove the call. Not intended to be left + * in committed tests. + */ + default void printFragmentPlans(List<PlanFragment> fragments) { + for (PlanFragment f : fragments) { + System.err.println("=== fragment " + f.getFragmentId() + " ==="); + System.err.println(PlanShape.prettyPrint(f.getPlanRoot())); + } + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/LocalExchangePlannerTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/LocalExchangePlannerTest.java index 23170c2b5f5..52536492ef9 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/LocalExchangePlannerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/LocalExchangePlannerTest.java @@ -96,28 +96,29 @@ public class LocalExchangePlannerTest extends TestWithFeService implements PlanS assertMatchesAnyFragment(planner.getFragments(), shape); } - @Test - public void testAggFromScanUsesLocalExecutionHashShuffle() throws Exception { - connectContext.getSessionVariable().setEnableLocalShufflePlanner(true); - connectContext.getSessionVariable().setEnableLocalShuffle(true); - connectContext.getSessionVariable().setEnableNereidsDistributePlanner(true); - connectContext.getSessionVariable().setIgnoreStorageDataDistribution(true); - connectContext.getSessionVariable().setPipelineTaskNum("4"); - connectContext.getSessionVariable().setForceToLocalShuffle(false); - - // With the Bug 7 fix, OlapScanNode returns NOOP (no self-wrapping), - // so the parent AggregationNode's requireHash() resolves to - // LOCAL_EXECUTION_HASH_SHUFFLE (scan child → local hash). - StmtExecutor executor = executeNereidsSql( - "explain distributed plan select k1, k2, count(*) from test.t1 group by k1, k2"); + /** + * Debug-only helper: run the SQL and dump every fragment's plan tree to stderr. + * Use this when crafting a new {@link #assertPlanShape} assertion to see what + * the real plan looks like, then replace this call with the proper DSL pattern. + */ + protected void dumpPlan(String sql) throws Exception { + StmtExecutor executor = executeNereidsSql("explain distributed plan " + sql); NereidsPlanner planner = (NereidsPlanner) executor.planner(); - EnumSet<LocalExchangeType> types = collectLocalExchangeTypes(planner.getFragments()); - String explain = collectFragmentExplain(planner.getFragments()); + System.err.println("=== dump for SQL: " + sql + " ==="); + printFragmentPlans(planner.getFragments()); + } - Assertions.assertTrue(types.contains(LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE), - "expected LOCAL_EXECUTION_HASH_SHUFFLE in plan, actual: " + types); - Assertions.assertTrue(explain.contains("LOCAL_EXECUTION_HASH_SHUFFLE"), - "expected LOCAL_EXECUTION_HASH_SHUFFLE in explain output, actual explain: " + explain); + /** + * Assert that NO local exchange of the given type appears anywhere in any + * fragment's plan tree. Companion to {@link #assertPlanShape} for negative + * checks where pinning the full shape would be brittle. + */ + protected void assertNoLocalExchangeOfType(String sql, LocalExchangeType excludedType) throws Exception { + StmtExecutor executor = executeNereidsSql("explain distributed plan " + sql); + NereidsPlanner planner = (NereidsPlanner) executor.planner(); + EnumSet<LocalExchangeType> types = collectLocalExchangeTypes(planner.getFragments()); + Assertions.assertFalse(types.contains(excludedType), + "expected no " + excludedType + " in plan, actual: " + types); } @Test @@ -144,84 +145,308 @@ public class LocalExchangePlannerTest extends TestWithFeService implements PlanS } @Test - public void testNonSerialScanKeepsBucketHashDistribution() throws Exception { - connectContext.getSessionVariable().setEnableLocalShufflePlanner(true); - connectContext.getSessionVariable().setEnableLocalShuffle(true); - connectContext.getSessionVariable().setEnableNereidsDistributePlanner(true); - connectContext.getSessionVariable().setIgnoreStorageDataDistribution(false); - connectContext.getSessionVariable().setPipelineTaskNum("1"); - connectContext.getSessionVariable().setForceToLocalShuffle(false); - - StmtExecutor executor = executeNereidsSql( - "explain distributed plan select k1, count(*) from test.t1 group by k1 order by k1"); - NereidsPlanner planner = (NereidsPlanner) executor.planner(); - String explain = collectFragmentExplain(planner.getFragments()); + public void testAggWithoutKeyTwoPhase() throws Exception { + // doc rule "Agg / 没有 groupby key" → PASSTHROUGH. + // count(*) generates a two-phase aggregation: + // FINAL AggregationNode → ExchangeNode (UNPARTITIONED, serial) + // → PARTIAL AggregationNode (serial, no keys) + // → LE(PASSTHROUGH) (fan-out from 1-task pooling scan) + // → OlapScan(t1) + // The intermediate LE(PT) is the heavy-op fan-out for the pooling scan; + // the serial PARTIAL agg sits right above it. + setupLocalShuffleSession(null); + assertPlanShape("select count(*) from test.t1", + anyTree( + agg( + anyTree( + agg( + localExchange(PT, + olapScan("t1"))))))); + } - Assertions.assertFalse(explain.contains("LOCAL_EXECUTION_HASH_SHUFFLE"), - "non-serial scan should keep BUCKET_HASH_SHUFFLE output and avoid local hash exchange, explain: " - + explain); + @Test + public void testBroadcastJoinPoolingShapeDsl() throws Exception { + // doc rule "HashJoin / BROADCAST / 池化": + // probe ← LE(PASSTHROUGH) ← scan + // build ← LE(PASS_TO_ONE) ← Exchange (cross-fragment broadcast) + // Matches doc Example 2 plus the pooling variant (PASS_TO_ONE instead of NOOP + // on the serial build-side exchange). + setupLocalShuffleSession(null); + assertPlanShape("select * from test.t1 a join [broadcast] test.t2 b on a.k1=b.k1", + anyTree( + hashJoin( + localExchange(PT, + olapScan()), + localExchange(PASS_TO_ONE_LE, + anyTree(exchange()))))); } @Test - public void testJoinPlanContainsHashShuffle() throws Exception { - connectContext.getSessionVariable().setEnableLocalShufflePlanner(true); - connectContext.getSessionVariable().setEnableLocalShuffle(true); - connectContext.getSessionVariable().setEnableNereidsDistributePlanner(true); - connectContext.getSessionVariable().setIgnoreStorageDataDistribution(true); - connectContext.getSessionVariable().setPipelineTaskNum("4"); - connectContext.getSessionVariable().setForceToLocalShuffle(false); - - StmtExecutor executor = executeNereidsSql( - "explain distributed plan select a.k1, count(*) " - + "from test.t1 a join test.t2 b on a.k1 = b.k1 group by a.k1"); - NereidsPlanner planner = (NereidsPlanner) executor.planner(); - EnumSet<LocalExchangeType> types = collectLocalExchangeTypes(planner.getFragments()); + public void testNlJoinPoolingShapeDsl() throws Exception { + // doc rule "NL join / 池化": build BROADCAST, probe ADAPTIVE_PASSTHROUGH. + // With pooling, the probe-side scan is serial (1 task) so the framework + // additionally inserts an inner LE(PASSTHROUGH) below the ADAPTIVE_PASSTHROUGH + // to fan the serial scan out before adaptive redistribution. The build side + // comes through a cross-fragment Exchange (broadcast). + setupLocalShuffleSession(null); + assertPlanShape("select * from test.t1 a, test.t2 b where a.k1 > b.k1", + anyTree( + nestedLoopJoin( + localExchange(ADAPTIVE_PT, + localExchange(PT, + anyTree(olapScan()))), + localExchange(BROADCAST_LE, + anyTree(exchange()))))); + } - // With pooling scan and local shuffle planner, hash exchanges should be present - boolean hasHashShuffle = types.stream().anyMatch(t -> t.isHashShuffle()); - Assertions.assertTrue(hasHashShuffle || types.contains(LocalExchangeType.PASSTHROUGH), - "expected hash shuffle or passthrough in plan, actual: " + types); + @Test + public void testNullAwareLeftAntiJoinHasNoLocalExchange() throws Exception { + // doc rule "HashJoin / NULL_AWARE_LEFT_ANTI_JOIN": NOOP/NOOP/NOOP — no LE + // inserted on either side, in either direction. + // HashJoin(NULL_AWARE_LEFT_ANTI_JOIN) + // ← OlapScan(t1) (probe, no LE) + // ← Exchange ← OlapScan(t2) (build, no LE) + setupLocalShuffleSession(null); + assertPlanShape("select k1 from test.t1 where k1 not in (select k1 from test.t2)", + anyTree( + hashJoin( + olapScan(), + anyTree(exchange())) + .where(j -> j.getJoinOp() + == org.apache.doris.analysis.JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN))); } @Test - public void testNoopLocalExchangeNotInjected() throws Exception { - connectContext.getSessionVariable().setEnableLocalShufflePlanner(true); - connectContext.getSessionVariable().setEnableLocalShuffle(true); - connectContext.getSessionVariable().setEnableNereidsDistributePlanner(true); - connectContext.getSessionVariable().setForceToLocalShuffle(false); + public void testAnalyticNoPartitionByHasNoLocalExchange() throws Exception { + // doc rule "Analytic / 无 partition by": serial path with PASSTHROUGH require + // upstream. Because the analytic fragment uses a cross-fragment UNPARTITIONED + // Exchange to gather data into a single instance, no LE is inserted between + // the AnalyticEvalNode and the Exchange — the Exchange already serves as the + // serial source. + setupLocalShuffleSession(null); + assertPlanShape("select k1, row_number() over () from test.t1", + anyTree( + analytic( + anyTree(exchange())))); + } - StmtExecutor executor = executeNereidsSql("explain distributed plan select * from test.t1 limit 1"); - NereidsPlanner planner = (NereidsPlanner) executor.planner(); - EnumSet<LocalExchangeType> types = collectLocalExchangeTypes(planner.getFragments()); + // -- Tier A: scenarios borrowed from Trino's TestAddExchangesPlans -- + + @Test + public void testUnionDistinctTwoPhaseAgg() throws Exception { + // Borrowed from Trino's testRepartitionForUnionWithAnyTableScans. + // UNION (not UNION ALL) implies DISTINCT — Doris realises that with a + // two-phase Aggregation above the UnionNode. The cross-fragment Exchanges + // feeding the Union pre-shuffle the scan outputs; the LE(PT) above the + // Union is the heavy-op fan-out for the partial agg over the gathered union. + // FINAL Agg ← Exchange ← PARTIAL Agg ← LE(PT) ← Union ← {Exchange ← scan} x 2 + setupLocalShuffleSession(null); + assertPlanShape("select k1 from test.t1 union select k1 from test.t2", + anyTree( + agg( + anyTree( + agg( + localExchange(PT, + union( + anyTree(olapScan()), + anyTree(olapScan())))))))); + } - Assertions.assertFalse(types.contains(LocalExchangeType.NOOP), - "NOOP local exchange should not be materialized as a node"); + @Test + public void testUnionAllBeforeHashJoin() throws Exception { + // Borrowed from Trino's testRepartitionForUnionAllBeforeHashJoin. + // UNION ALL feeds into a hash join — the join's hash requirement is + // satisfied by the cross-fragment Exchanges sitting under each Union branch + // (data is already hash-distributed by the time it reaches Union), so no + // intra-fragment LE is needed on either probe or build side. + setupLocalShuffleSession(null); + assertPlanShape("select * from (select k1 from test.t1 union all select k1 from test.t2) u " + + "join test.t1 t3 on u.k1=t3.k1", + anyTree( + hashJoin( + union( + anyTree(olapScan()), + anyTree(olapScan())), + anyTree(exchange())))); } @Test - public void testHashShuffleHasDistributeExprs() throws Exception { - connectContext.getSessionVariable().setEnableLocalShufflePlanner(true); - connectContext.getSessionVariable().setEnableLocalShuffle(true); - connectContext.getSessionVariable().setEnableNereidsDistributePlanner(true); - connectContext.getSessionVariable().setIgnoreStorageDataDistribution(true); - connectContext.getSessionVariable().setPipelineTaskNum("4"); - connectContext.getSessionVariable().setForceToLocalShuffle(false); - - // Use a simple agg query that reliably produces hash local exchange - StmtExecutor executor = executeNereidsSql( - "explain distributed plan select k1, k2, count(*) from test.t1 group by k1, k2"); - NereidsPlanner planner = (NereidsPlanner) executor.planner(); - List<LocalExchangeNode> localExchanges = collectLocalExchangeNodes(planner.getFragments()); + public void testWindowPartitionByBucketKey() throws Exception { + // Borrowed from Trino's testWindowIsExactlyPartitioned. + // PARTITION BY uses the table's bucket key (k1) — Doris's analytic eval + // is colocate-eligible. With pooling, the chain is: + // AnalyticEval ← Sort ← LE(LOCAL_HASH) ← LE(PT) ← scan + // The inner LE(PT) is the heavy-op fan-out for the serial pooling scan. + setupLocalShuffleSession(null); + assertPlanShape("select k1, row_number() over (partition by k1) from test.t1", + anyTree( + analytic( + sort( + localExchange(LOCAL_HASH, + localExchange(PT, + olapScan("t1"))))))); + } + + @Test + public void testWindowPartitionByNonBucketKey() throws Exception { + // Borrowed from Trino's testRowNumberIsExactlyPartitioned (negative variant). + // PARTITION BY uses a non-bucket key (k2) — colocate is not eligible, so + // the analytic eval lives in its own fragment fed by a cross-fragment + // hash-partitioned Exchange. No intra-fragment LE inside the analytic fragment. + setupLocalShuffleSession(null); + assertPlanShape("select k1, row_number() over (partition by k2) from test.t1", + anyTree( + analytic( + sort( + anyTree(exchange()))))); + } + + @Test + public void testNestedUnionAll() throws Exception { + // Borrowed from Trino's testNestedUnionAll. + // Three-way UNION ALL flattens into a single UnionNode with three + // cross-fragment Exchange children. No LE since there's no downstream + // consumer requiring hash distribution. + setupLocalShuffleSession(null); + assertPlanShape( + "select k1 from test.t1 union all " + + "(select k1 from test.t2 union all select k1 from test.t1)", + anyTree( + union( + anyTree(olapScan()), + anyTree(olapScan()), + anyTree(olapScan())))); + } + + @Test + public void testGroupedAggOverNlj() throws Exception { + // Borrowed from Trino's testGroupedAggregationAboveUnionAllCrossJoined + // (NLJ + agg variant). NLJ output is ADAPTIVE_PASSTHROUGH; the outer Agg + // requires HASH on k1. Because ADAPTIVE_PASSTHROUGH does not satisfy HASH, + // an LE(LOCAL_HASH) is inserted between the NLJ output and the Agg. + // Agg ← LE(LOCAL_HASH) ← NLJ + // ├─ LE(ADAPTIVE_PT) ← LE(PT) ← scan(t1) + // └─ LE(BROADCAST) ← Exchange ← scan(t2) + setupLocalShuffleSession(null); + assertPlanShape("select a.k1, count(*) from test.t1 a, test.t2 b where a.k1 > b.k1 group by a.k1", + anyTree( + agg( + localExchange(LOCAL_HASH, + nestedLoopJoin( + localExchange(ADAPTIVE_PT, + localExchange(PT, + anyTree(olapScan()))), + localExchange(BROADCAST_LE, + anyTree(exchange()))))))); + } + + @Test + public void testTopNQualifyPartitionSort() throws Exception { + // Borrowed from Trino's testTopNRowNumberIsExactlyPartitioned. + // ROW_NUMBER() filtered by `rn = 1` triggers Doris's PartitionSortNode + // optimisation (LOCAL phase pre-trims rows before the global Analytic). + // The chain becomes: + // AnalyticEval ← Sort ← LE(LOCAL_HASH) ← PartitionSort ← LE(PT) ← scan + // The LE(PT) under PartitionSort is the heavy-op fan-out for the pooling scan; + // the LE(LOCAL_HASH) above PartitionSort enforces hash partitioning for the + // global ROW_NUMBER computation. + setupLocalShuffleSession(null); + assertPlanShape( + "select k1, k2 from (select k1, k2, row_number() over (partition by k1 order by k2) rn " + + "from test.t1) t where rn = 1", + anyTree( + analytic( + sort( + localExchange(LOCAL_HASH, + partitionSort( + localExchange(PT, + olapScan("t1")))))))); + } + + @Test + public void testAggOverBroadcastJoin() throws Exception { + // Borrowed from Trino's testGroupedAggregationAboveUnionAll variant. + // count(*) over a broadcast join generates a two-phase aggregation; the + // partial agg sits directly on top of the HashJoin and the final agg lives + // in a separate fragment (count merge): + // FINAL Agg ← Exchange ← PARTIAL Agg ← HashJoin + // ├─ LE(PT) ← scan + // └─ LE(PASS_TO_ONE) ← Exchange ← scan + setupLocalShuffleSession(null); + assertPlanShape("select count(*) from test.t1 a join [broadcast] test.t2 b on a.k1=b.k1", + anyTree( + agg( + anyTree( + agg( + hashJoin( + localExchange(PT, + olapScan()), + localExchange(PASS_TO_ONE_LE, + anyTree(exchange())))))))); + } + + @Test + public void testNonSerialScanKeepsBucketHashDistribution() throws Exception { + // Non-pooling scan with pipelineTaskNum=1 → the BUCKET_HASH_SHUFFLE output of + // the colocated scan is preserved end-to-end; no LOCAL_EXECUTION_HASH_SHUFFLE + // is ever introduced. Only a serial-source PASSTHROUGH appears (for the + // SortNode's merge-by-exchange). + setupLocalShuffleSession(sv -> { + sv.setIgnoreStorageDataDistribution(false); + try { + sv.setPipelineTaskNum("1"); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + assertNoLocalExchangeOfType( + "select k1, count(*) from test.t1 group by k1 order by k1", + LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE); + } + + @Test + public void testJoinPlanContainsHashShuffle() throws Exception { + // Pooling hash join under an aggregate. Both sides of the join feed through + // local exchanges; the agg above the join requires LOCAL_EXECUTION_HASH_SHUFFLE. + // Agg → LE(LOCAL_HASH) → HashJoin + // ← LE(PASSTHROUGH) ← OlapScan(t1) (probe) + // ← LE(PASS_TO_ONE) ← Exchange ← OlapScan(t2) (build) + setupLocalShuffleSession(null); + assertPlanShape( + "select a.k1, count(*) from test.t1 a join test.t2 b on a.k1 = b.k1 group by a.k1", + anyTree( + agg( + localExchange(LOCAL_HASH, + hashJoin( + localExchange(PT, + olapScan("t1")), + localExchange(PASS_TO_ONE_LE, + anyTree(exchange()))))))); + } - boolean hasHashShuffleWithExpr = localExchanges.stream().anyMatch(node -> - node.getExchangeType().isHashShuffle() - && node.getDistributeExprLists() != null - && !node.getDistributeExprLists().isEmpty()); - String exchangeInfo = localExchanges.stream() - .map(n -> n.getExchangeType() + "(exprs=" + n.getDistributeExprLists() + ")") - .collect(java.util.stream.Collectors.joining(", ")); - Assertions.assertTrue(hasHashShuffleWithExpr, - "expected at least one hash local exchange with distribute exprs, found: " + exchangeInfo); + @Test + public void testNoopLocalExchangeNotInjected() throws Exception { + // A simple LIMIT scan plan should contain no local exchanges of any kind — + // and most importantly, no synthesized NOOP node. doc rule "NOOP is meta, + // never materialized": the planner uses NOOP as a 'skip' signal during + // resolution but never instantiates a LocalExchangeNode with type NOOP. + setupLocalShuffleSession(null); + assertNoLocalExchangeOfType("select * from test.t1 limit 1", LocalExchangeType.NOOP); + } + + @Test + public void testHashShuffleHasDistributeExprs() throws Exception { + // Same scan→agg plan as testAggFromScanShapeDsl, but with a predicate that + // checks the inserted LE(LOCAL_HASH) carries non-empty distribute expressions + // (without which the BE would not know which columns to hash on). + setupLocalShuffleSession(null); + assertPlanShape("select k1, k2, count(*) from test.t1 group by k1, k2", + anyTree( + agg( + localExchange(LOCAL_HASH, + localExchange(PT, olapScan("t1"))) + .where(le -> le.getDistributeExprLists() != null + && !le.getDistributeExprLists().isEmpty())))); } @Test @@ -235,70 +460,49 @@ public class LocalExchangePlannerTest extends TestWithFeService implements PlanS @Test public void testSetOperationUnderAggHasHashShuffle() throws Exception { - connectContext.getSessionVariable().setEnableLocalShufflePlanner(true); - connectContext.getSessionVariable().setEnableLocalShuffle(true); - connectContext.getSessionVariable().setEnableNereidsDistributePlanner(true); - connectContext.getSessionVariable().setIgnoreStorageDataDistribution(false); - connectContext.getSessionVariable().setPipelineTaskNum("4"); - connectContext.getSessionVariable().setForceToLocalShuffle(false); - - StmtExecutor executor = executeNereidsSql( - "explain distributed plan select k1, count(*) from (" - + "select k1 from test.t1 union all select k1 from test.t2" - + ") u group by k1"); - NereidsPlanner planner = (NereidsPlanner) executor.planner(); - EnumSet<LocalExchangeType> types = collectLocalExchangeTypes(planner.getFragments()); - - // With non-pooling scan and colocated bucket distribution, local exchanges may - // not be inserted. Verify plan at least doesn't crash and contains valid exchange types. - boolean hasLocalExchange = !types.isEmpty(); - // If local exchanges are present, they should include hash shuffle types - if (hasLocalExchange) { - boolean hasHashShuffle = types.stream().anyMatch(t -> t.isHashShuffle()); - Assertions.assertTrue(hasHashShuffle, - "expected hash shuffle in set-operation plan when exchanges present, actual: " + types); - } + // Non-pooling UNION ALL under an agg. The outer agg's group key requires a + // LOCAL_EXECUTION_HASH_SHUFFLE directly above the UnionNode (above each + // branch's pre-agg). + setupLocalShuffleSession(sv -> sv.setIgnoreStorageDataDistribution(false)); + assertPlanShape( + "select k1, count(*) from (select k1 from test.t1 union all select k1 from test.t2) u group by k1", + anyTree( + agg( + localExchange(LOCAL_HASH, + union( + anyTree(olapScan()), + anyTree(olapScan())))))); } @Test public void testAnalyticPlanContainsPassthroughAndLocalHashShuffle() throws Exception { - connectContext.getSessionVariable().setEnableLocalShufflePlanner(true); - connectContext.getSessionVariable().setEnableLocalShuffle(true); - connectContext.getSessionVariable().setEnableNereidsDistributePlanner(true); - connectContext.getSessionVariable().setIgnoreStorageDataDistribution(true); - connectContext.getSessionVariable().setPipelineTaskNum("4"); - connectContext.getSessionVariable().setForceToLocalShuffle(false); - - StmtExecutor executor = executeNereidsSql( - "explain distributed plan select k1, k2, row_number() over(partition by k1 order by k2) " - + "from test.t1 order by k1, k2"); - NereidsPlanner planner = (NereidsPlanner) executor.planner(); - EnumSet<LocalExchangeType> types = collectLocalExchangeTypes(planner.getFragments()); - - // Analytic plan: mergeByExchange sort inserts PASSTHROUGH. - // With pooling scan (ignore_storage_data_distribution=true), hash or passthrough exchanges expected. - Assertions.assertTrue(types.contains(LocalExchangeType.PASSTHROUGH), - "expected PASSTHROUGH in analytic plan, actual: " + types); + // doc rule "Analytic / 有 partition by / 池化": LE(LOCAL_HASH) for partition + // redistribution, plus a LE(PASSTHROUGH) heavy-op fan-out below it for the + // 1-task pooling scan, then LE(PASSTHROUGH) above the AnalyticEval for the + // outer order-by merge. + // SortNode → LE(PASSTHROUGH) → AnalyticEval → SortNode + // → LE(LOCAL_HASH) → LE(PASSTHROUGH) → scan + setupLocalShuffleSession(null); + assertPlanShape( + "select k1, k2, row_number() over(partition by k1 order by k2) from test.t1 order by k1, k2", + anyTree( + sort( + localExchange(PT, + analytic( + sort( + localExchange(LOCAL_HASH, + localExchange(PT, + olapScan("t1"))))))))); } @Test public void testGroupingSetsPlanContainsHashShuffle() throws Exception { - connectContext.getSessionVariable().setEnableLocalShufflePlanner(true); - connectContext.getSessionVariable().setEnableLocalShuffle(true); - connectContext.getSessionVariable().setEnableNereidsDistributePlanner(true); - connectContext.getSessionVariable().setIgnoreStorageDataDistribution(false); - connectContext.getSessionVariable().setPipelineTaskNum("4"); - connectContext.getSessionVariable().setForceToLocalShuffle(false); - - StmtExecutor executor = executeNereidsSql( - "explain distributed plan select k1, k2, sum(v1) from test.t1 " - + "group by grouping sets((k1), (k1, k2))"); - NereidsPlanner planner = (NereidsPlanner) executor.planner(); - EnumSet<LocalExchangeType> types = collectLocalExchangeTypes(planner.getFragments()); - - Assertions.assertFalse(types.contains(LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE), - "grouping-sets plan should not force local execution hash shuffle when scan keeps bucket" - + " distribution, actual: " + types); + // Non-pooling grouping sets keeps the colocated BUCKET_HASH_SHUFFLE output of + // the scan all the way through Repeat→Agg; no LE(LOCAL_HASH) is needed. + setupLocalShuffleSession(sv -> sv.setIgnoreStorageDataDistribution(false)); + assertNoLocalExchangeOfType( + "select k1, k2, sum(v1) from test.t1 group by grouping sets((k1), (k1, k2))", + LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE); } @Test @@ -329,23 +533,39 @@ public class LocalExchangePlannerTest extends TestWithFeService implements PlanS @Test public void testMixedPlanWithPoolingScan() throws Exception { - connectContext.getSessionVariable().setEnableLocalShufflePlanner(true); - connectContext.getSessionVariable().setEnableLocalShuffle(true); - connectContext.getSessionVariable().setEnableNereidsDistributePlanner(true); - connectContext.getSessionVariable().setIgnoreStorageDataDistribution(true); - connectContext.getSessionVariable().setPipelineTaskNum("4"); - connectContext.getSessionVariable().setForceToLocalShuffle(false); - - StmtExecutor executor = executeNereidsSql( - "explain distributed plan select u.k1, count(*) from (" - + "select k1, k2 from test.t1 group by grouping sets((k1), (k1, k2))" - + ") u join test.t2 b on u.k1 = b.k1 group by u.k1"); - NereidsPlanner planner = (NereidsPlanner) executor.planner(); - EnumSet<LocalExchangeType> types = collectLocalExchangeTypes(planner.getFragments()); + // Pooling: grouping-sets sub-query JOINed and re-aggregated. Probe side + // wraps the inner agg/Repeat with LE(LOCAL_HASH) over LE(PASSTHROUGH); build + // side comes through LE(PASS_TO_ONE) over an inter-fragment Exchange. + // Agg → HashJoin + // ← Agg → LE(LOCAL_HASH) → LE(PASSTHROUGH) → Repeat → scan(t1) + // ← LE(PASS_TO_ONE) → Exchange → scan(t2) + setupLocalShuffleSession(null); + assertPlanShape( + "select u.k1, count(*) from (select k1, k2 from test.t1 group by grouping sets((k1), (k1, k2))) u " + + "join test.t2 b on u.k1 = b.k1 group by u.k1", + anyTree( + agg( + hashJoin( + agg( + localExchange(LOCAL_HASH, + localExchange(PT, + anyTree(olapScan("t1"))))), + localExchange(PASS_TO_ONE_LE, + anyTree(exchange())))))); + } - // With pooling scan, local exchanges should be present - Assertions.assertFalse(types.isEmpty(), - "expected local exchanges in mixed plan with pooling scan, actual: " + types); + @Test + public void testUnionAllScanAndValues() throws Exception { + // Tier B (borrowed from Trino): UNION ALL of a real OlapScan and inline + // VALUES rows. The values branches flow through their own fragments and + // land at a UnionNode that gathers via Exchange. Verifies the + // scan-side LE(PASSTHROUGH) heavy-op fan-out is still inserted while the + // values branches contribute no local exchanges (already serial sources). + setupLocalShuffleSession(null); + assertPlanShape("select k1 from test.t1 union all select 1 union all select 2", + anyTree( + union( + anyTree(exchange())))); } private EnumSet<LocalExchangeType> collectLocalExchangeTypes(List<PlanFragment> fragments) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
