This is an automated email from the ASF dual-hosted git repository. 924060929 pushed a commit to branch fe_local_shuffle_rebase in repository https://gitbox.apache.org/repos/asf/doris.git
commit 635e17fd838e382e06deb3b547cd9c9517730fa4 Author: 924060929 <[email protected]> AuthorDate: Wed May 20 13:44:40 2026 +0800 [fix](local shuffle) Mirror BE partition_exprs and sender_count for FE-planned LE Two correctness issues in the FE-planned local-shuffle path, both surfaced by single-tablet POOLING / share-scan fragments. 1. FE planner inserted LE(LOCAL_HASH) below a streaming partial agg with distributeExprLists = child table distribution (e.g. [id]) instead of grouping_exprs (e.g. [category]). BE's AggSinkOperatorX / StreamingAggOperatorX::update_operator picks _partition_exprs = grouping_exprs when the chain is not followed_by_shuffled_operator — the common case for a streaming preagg at fragment root with only a cross-fragment HASH ExchangeSink above. Using child distribution scattered same-group rows across N partial-agg instances, turning the preagg into a no-op and breaking row-arrival order at the downstream merge-finalize (manifests as non-deterministic group_concat / py_json_array_agg output, e.g. test_python_udaf_complex json_array_agg). Fix: add overridable PlanNode#getLocalExchangeDistributeExprs(childIndex, followedByShuffled) defaulting to the child's distribution, and override it on AggregationNode to mirror BE's update_operator: use child distribution only when (followedByShuffled || hasDistinct); otherwise use grouping_exprs. 2. BE _create_deferred_local_exchangers used sender_count = upstream_pipe->num_tasks() with no max(_, _num_instances). When the upstream pipeline has a serial source (POOLING OlapScan, serial Exchange), num_tasks() stays at 1 and _propagate_local_exchange_num_tasks Pass 1 deliberately does not raise it, but the shared exchanger is shared across all _num_instances fragment instances on this BE — each instance closes once, so total close-count = _num_instances. Initial 1 minus _num_instances closes drove _running_sink_operators negative (e.g. -5 for 6 instances, -15 for 16), so the exchanger never reached "all senders done", downstream sources blocked on SHUFFLE_DATA_DEPENDENCY forever, and the query hung. Fragments hold block references through the hang; on BE shutdown mem_tracker_limiter::~MemTrackerLimiter fired FATAL, aborting BE and producing the build-948971 "stop grace fail" — root case being dictionary_p0.test_dict_load_and_get_ip_trie's refresh dictionary running scan + LE(PASSTHROUGH) + cross-fragment DICTIONARY_SINK. Fix: mirror BE-planned _add_local_exchange_impl (~line 1023) which uses std::max(cur_pipe->num_tasks(), _num_instances). Tests - New LocalExchangePlannerTest#testStreamingAggHashShuffleUsesGroupingExprs: with t1 DISTRIBUTED BY HASH(k1) and SELECT k2, count(*) GROUP BY k2 (k2 non-bucket -> two-phase agg), asserts the LE below the streaming partial agg carries [k2] (grouping key) not [k1] (child distribution). Verified failing pre-fix, passing post-fix. Whole class (26 tests) green. - Local cluster (output_test, 29030): group_concat probe stable 1,2,3,4,5 across 20 runs after both fixes; matches BE-planner=false output. --- be/src/exec/pipeline/pipeline_fragment_context.cpp | 14 +++++++- .../org/apache/doris/planner/AggregationNode.java | 22 +++++++++++++ .../java/org/apache/doris/planner/PlanNode.java | 24 +++++++++++++- .../apache/doris/qe/LocalExchangePlannerTest.java | 37 ++++++++++++++++++++++ 4 files changed, 95 insertions(+), 2 deletions(-) diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp b/be/src/exec/pipeline/pipeline_fragment_context.cpp index 1cbc06f5e2d..fac2649b2c6 100644 --- a/be/src/exec/pipeline/pipeline_fragment_context.cpp +++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp @@ -718,7 +718,19 @@ Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip Status PipelineFragmentContext::_create_deferred_local_exchangers() { for (auto& info : _deferred_exchangers) { - const int sender_count = info.upstream_pipe->num_tasks(); + // sender_count = total number of sink instances that will call sub_running_sink_operators + // on this exchanger's shared_state. shared_state is shared across all `_num_instances` + // fragment instances on this BE — each instance contributes `upstream_pipe->num_tasks()` + // sink tasks. When the upstream pipeline has a serial source (e.g. POOLING OlapScan, + // serial Exchange), `num_tasks()` stays at 1 and `_propagate_local_exchange_num_tasks` + // Pass 1 deliberately does not raise it, so the close-count is `_num_instances`, not 1. + // Without the max(_, _num_instances), sub_running_sink_operators decrements past zero, + // the exchanger never sees `_running_sink_operators == 0`, downstream sources block + // forever on SHUFFLE_DATA_DEPENDENCY, and the query hangs — eventually triggering a + // mem-tracker leak FATAL on the leftover blocks the exchanger still holds. + // Mirrors BE-planned `_add_local_exchange_impl` (line ~1023) which already uses + // `std::max(cur_pipe->num_tasks(), _num_instances)`. + const int sender_count = std::max(info.upstream_pipe->num_tasks(), _num_instances); switch (info.partition_type) { case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE: case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE: diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java index 35463dbf080..210252df253 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java @@ -372,6 +372,28 @@ public class AggregationNode extends PlanNode { return aggInfo.getGroupingExprs(); } + @Override + protected List<Expr> getLocalExchangeDistributeExprs(int childIndex, boolean followedByShuffled) { + // Mirror BE AggSinkOperatorX::update_operator / StreamingAggOperatorX::update_operator: + // _partition_exprs = (distribute_expr_lists set && (followed_by_shuffled || has_distinct)) + // ? distribute_expr_lists[0] : grouping_exprs + // The HASH LocalExchange must partition by _partition_exprs so a streaming partial preagg + // locally collapses same-key rows. Using child distribution (default) for a non-shuffled + // chain scatters same-group rows across N instances, leaving partial_preagg essentially a + // no-op and breaking row-arrival order at downstream merge-finalize (e.g. group_concat). + List<Expr> childDist = getChildDistributeExprList(childIndex); + boolean hasDistinct = aggInfo.getAggregateExprs().stream() + .map(FunctionCallExpr::getFnName) + .filter(name -> name != null) + .map(name -> name.getFunction()) + .filter(name -> name != null) + .anyMatch(name -> name.startsWith("multi_distinct_")); + if (childDist != null && !childDist.isEmpty() && (followedByShuffled || hasDistinct)) { + return childDist; + } + return Lists.newArrayList(aggInfo.getGroupingExprs()); + } + @Override public boolean requiresShuffleForCorrectness() { // Mirrors BE's AggSinkOperatorX::is_shuffled_operator() exactly: diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index 125ccaecc25..fdb9f93414f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -1168,7 +1168,7 @@ public abstract class PlanNode extends TreeNode<PlanNode> { // 5. Resolve exchange type and create LE node LocalExchangeType preferType = AddLocalExchange.resolveExchangeType( require, translatorContext, this, childOutput.first); - List<Expr> distributeExprs = getChildDistributeExprList(childIndex); + List<Expr> distributeExprs = getLocalExchangeDistributeExprs(childIndex, selfOrInheritedShuffled); PlanNode leNode = createLocalExchange(translatorContext, childOutput.first, preferType, distributeExprs); return Pair.of(leNode, preferType); } @@ -1214,6 +1214,28 @@ public abstract class PlanNode extends TreeNode<PlanNode> { } } + /** + * Return distribute exprs used as the hash key when {@link #enforceRequire} inserts a + * LocalExchange between this node and {@code child[childIndex]}. Default returns the + * child's output distribution ({@code childrenDistributeExprLists[childIndex]}). + * + * <p>Subclasses override this to mirror BE-specific {@code _partition_exprs} logic. For + * example BE's {@code AggSinkOperatorX::update_operator} picks + * {@code grouping_exprs} when {@code !_followed_by_shuffled_operator && !has_distinct}, + * even though the child outputs a different (hash) distribution — and the LE inserted + * before the streaming preagg must partition by {@code grouping_exprs} so a local + * partial reduce actually collapses same-key rows. Using the default (child + * distribution) here would scatter same-group rows across instances and degrade the + * preagg to a no-op, also breaking row-arrival order at downstream merge-finalize. + * + * @param childIndex which child + * @param followedByShuffled whether the chain at this node is followed by a shuffled + * operator (mirrors BE's {@code _followed_by_shuffled_operator}) + */ + protected List<Expr> getLocalExchangeDistributeExprs(int childIndex, boolean followedByShuffled) { + return getChildDistributeExprList(childIndex); + } + /** * Returns the operator's own semantically-defined partition expressions * (e.g. GROUP BY exprs for aggregation, PARTITION BY exprs for analytic). diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/LocalExchangePlannerTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/LocalExchangePlannerTest.java index 52536492ef9..55b3c62b87c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/LocalExchangePlannerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/LocalExchangePlannerTest.java @@ -449,6 +449,43 @@ public class LocalExchangePlannerTest extends TestWithFeService implements PlanS && !le.getDistributeExprLists().isEmpty())))); } + @Test + public void testStreamingAggHashShuffleUsesGroupingExprs() throws Exception { + // Regression for: FE-planned LE(LOCAL_HASH) under a streaming partial agg used + // the child's table distribution (e.g. `k1`) instead of the grouping keys + // (e.g. `k2`). BE's AggSinkOperatorX/StreamingAggOperatorX::update_operator + // picks `_partition_exprs = grouping_exprs` when the chain is NOT followed by + // a shuffled operator (the common case where the streaming preagg sits at + // fragment root with only a cross-fragment HASH ExchangeSink above). Using + // child distribution instead scatters same-group rows across N instances, + // turning the partial preagg into a no-op and corrupting row-arrival order at + // downstream merge-finalize (manifests as e.g. non-deterministic + // group_concat / py_json_array_agg output). + // + // Table t1 is DISTRIBUTED BY HASH(k1). GROUP BY k2 forces a cross-fragment + // exchange (and thus a two-phase aggregation): the streaming partial agg lives + // in the lower fragment, with an FE-inserted LE(LOCAL_HASH) below it. The fix + // makes that LE carry [k2] (the grouping key) rather than [k1] (the child + // table distribution). + setupLocalShuffleSession(null); + assertPlanShape( + "select k2, count(*) from test.t1 group by k2", + anyTree( + agg( + localExchange(LOCAL_HASH, + localExchange(PT, olapScan("t1"))) + .where(le -> { + List<org.apache.doris.analysis.Expr> exprs = + le.getDistributeExprLists(); + if (exprs == null || exprs.size() != 1) { + return false; + } + org.apache.doris.analysis.Expr e = exprs.get(0); + return e instanceof org.apache.doris.analysis.SlotRef + && "k2".equals(((org.apache.doris.analysis.SlotRef) e).getCol()); + })))); + } + @Test public void testRequireHashSatisfyAllHashShuffleTypes() { LocalExchangeNode.LocalExchangeTypeRequire requireHash = LocalExchangeNode.LocalExchangeTypeRequire.requireHash(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
