This is an automated email from the ASF dual-hosted git repository.

924060929 pushed a commit to branch fe_local_shuffle_rebase_wip
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 3287a742f7be263ae28fab8b1bcc4f6e22cced3b
Author: 924060929 <[email protected]>
AuthorDate: Wed May 20 13:44:40 2026 +0800

    [fix](local shuffle) Mirror BE partition_exprs and sender_count for 
FE-planned LE
    
    Two correctness issues in the FE-planned local-shuffle path, both surfaced
    by single-tablet POOLING / share-scan fragments.
    
    1. FE planner inserted LE(LOCAL_HASH) below a streaming partial agg with
       distributeExprLists = child table distribution (e.g. [id]) instead of
       grouping_exprs (e.g. [category]).  BE's AggSinkOperatorX /
       StreamingAggOperatorX::update_operator picks _partition_exprs =
       grouping_exprs when the chain is not followed_by_shuffled_operator —
       the common case for a streaming preagg at fragment root with only a
       cross-fragment HASH ExchangeSink above.  Using child distribution
       scattered same-group rows across N partial-agg instances, turning the
       preagg into a no-op and breaking row-arrival order at the downstream
       merge-finalize (manifests as non-deterministic group_concat /
       py_json_array_agg output, e.g. test_python_udaf_complex json_array_agg).
    
       Fix: add overridable PlanNode#getLocalExchangeDistributeExprs(childIndex,
       followedByShuffled) defaulting to the child's distribution, and override
       it on AggregationNode to mirror BE's update_operator: use child
       distribution only when (followedByShuffled || hasDistinct); otherwise
       use grouping_exprs.
    
    2. BE _create_deferred_local_exchangers used sender_count =
       upstream_pipe->num_tasks() with no max(_, _num_instances).  When the
       upstream pipeline has a serial source (POOLING OlapScan, serial
       Exchange), num_tasks() stays at 1 and _propagate_local_exchange_num_tasks
       Pass 1 deliberately does not raise it, but the shared exchanger is
       shared across all _num_instances fragment instances on this BE — each
       instance closes once, so total close-count = _num_instances.  Initial 1
       minus _num_instances closes drove _running_sink_operators negative
       (e.g. -5 for 6 instances, -15 for 16), so the exchanger never reached
       "all senders done", downstream sources blocked on
       SHUFFLE_DATA_DEPENDENCY forever, and the query hung.  Fragments hold
       block references through the hang; on BE shutdown
       mem_tracker_limiter::~MemTrackerLimiter fired FATAL, aborting BE and
       producing the build-948971 "stop grace fail" — root case being
       dictionary_p0.test_dict_load_and_get_ip_trie's refresh dictionary
       running scan + LE(PASSTHROUGH) + cross-fragment DICTIONARY_SINK.
    
       Fix: mirror BE-planned _add_local_exchange_impl (~line 1023) which uses
       std::max(cur_pipe->num_tasks(), _num_instances).
    
    Tests
    - New LocalExchangePlannerTest#testStreamingAggHashShuffleUsesGroupingExprs:
      with t1 DISTRIBUTED BY HASH(k1) and SELECT k2, count(*) GROUP BY k2
      (k2 non-bucket -> two-phase agg), asserts the LE below the streaming
      partial agg carries [k2] (grouping key) not [k1] (child distribution).
      Verified failing pre-fix, passing post-fix.  Whole class (26 tests) green.
    - Local cluster (output_test, 29030): group_concat probe stable 1,2,3,4,5
      across 20 runs after both fixes; matches BE-planner=false output.
---
 be/src/exec/pipeline/pipeline_fragment_context.cpp | 14 +++++++-
 .../org/apache/doris/planner/AggregationNode.java  | 22 +++++++++++++
 .../java/org/apache/doris/planner/PlanNode.java    | 24 +++++++++++++-
 .../apache/doris/qe/LocalExchangePlannerTest.java  | 37 ++++++++++++++++++++++
 4 files changed, 95 insertions(+), 2 deletions(-)

diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp 
b/be/src/exec/pipeline/pipeline_fragment_context.cpp
index 1cbc06f5e2d..fac2649b2c6 100644
--- a/be/src/exec/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp
@@ -718,7 +718,19 @@ Status 
PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip
 
 Status PipelineFragmentContext::_create_deferred_local_exchangers() {
     for (auto& info : _deferred_exchangers) {
-        const int sender_count = info.upstream_pipe->num_tasks();
+        // sender_count = total number of sink instances that will call 
sub_running_sink_operators
+        // on this exchanger's shared_state.  shared_state is shared across 
all `_num_instances`
+        // fragment instances on this BE — each instance contributes 
`upstream_pipe->num_tasks()`
+        // sink tasks.  When the upstream pipeline has a serial source (e.g. 
POOLING OlapScan,
+        // serial Exchange), `num_tasks()` stays at 1 and 
`_propagate_local_exchange_num_tasks`
+        // Pass 1 deliberately does not raise it, so the close-count is 
`_num_instances`, not 1.
+        // Without the max(_, _num_instances), sub_running_sink_operators 
decrements past zero,
+        // the exchanger never sees `_running_sink_operators == 0`, downstream 
sources block
+        // forever on SHUFFLE_DATA_DEPENDENCY, and the query hangs — 
eventually triggering a
+        // mem-tracker leak FATAL on the leftover blocks the exchanger still 
holds.
+        // Mirrors BE-planned `_add_local_exchange_impl` (line ~1023) which 
already uses
+        // `std::max(cur_pipe->num_tasks(), _num_instances)`.
+        const int sender_count = std::max(info.upstream_pipe->num_tasks(), 
_num_instances);
         switch (info.partition_type) {
         case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
         case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
index 35463dbf080..210252df253 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
@@ -372,6 +372,28 @@ public class AggregationNode extends PlanNode {
         return aggInfo.getGroupingExprs();
     }
 
+    @Override
+    protected List<Expr> getLocalExchangeDistributeExprs(int childIndex, 
boolean followedByShuffled) {
+        // Mirror BE AggSinkOperatorX::update_operator / 
StreamingAggOperatorX::update_operator:
+        //   _partition_exprs = (distribute_expr_lists set && 
(followed_by_shuffled || has_distinct))
+        //                      ? distribute_expr_lists[0] : grouping_exprs
+        // The HASH LocalExchange must partition by _partition_exprs so a 
streaming partial preagg
+        // locally collapses same-key rows.  Using child distribution 
(default) for a non-shuffled
+        // chain scatters same-group rows across N instances, leaving 
partial_preagg essentially a
+        // no-op and breaking row-arrival order at downstream merge-finalize 
(e.g. group_concat).
+        List<Expr> childDist = getChildDistributeExprList(childIndex);
+        boolean hasDistinct = aggInfo.getAggregateExprs().stream()
+                .map(FunctionCallExpr::getFnName)
+                .filter(name -> name != null)
+                .map(name -> name.getFunction())
+                .filter(name -> name != null)
+                .anyMatch(name -> name.startsWith("multi_distinct_"));
+        if (childDist != null && !childDist.isEmpty() && (followedByShuffled 
|| hasDistinct)) {
+            return childDist;
+        }
+        return Lists.newArrayList(aggInfo.getGroupingExprs());
+    }
+
     @Override
     public boolean requiresShuffleForCorrectness() {
         // Mirrors BE's AggSinkOperatorX::is_shuffled_operator() exactly:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index 125ccaecc25..fdb9f93414f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -1168,7 +1168,7 @@ public abstract class PlanNode extends TreeNode<PlanNode> 
{
         // 5. Resolve exchange type and create LE node
         LocalExchangeType preferType = AddLocalExchange.resolveExchangeType(
                 require, translatorContext, this, childOutput.first);
-        List<Expr> distributeExprs = getChildDistributeExprList(childIndex);
+        List<Expr> distributeExprs = 
getLocalExchangeDistributeExprs(childIndex, selfOrInheritedShuffled);
         PlanNode leNode = createLocalExchange(translatorContext, 
childOutput.first, preferType, distributeExprs);
         return Pair.of(leNode, preferType);
     }
@@ -1214,6 +1214,28 @@ public abstract class PlanNode extends 
TreeNode<PlanNode> {
         }
     }
 
+    /**
+     * Return distribute exprs used as the hash key when {@link 
#enforceRequire} inserts a
+     * LocalExchange between this node and {@code child[childIndex]}.  Default 
returns the
+     * child's output distribution ({@code 
childrenDistributeExprLists[childIndex]}).
+     *
+     * <p>Subclasses override this to mirror BE-specific {@code 
_partition_exprs} logic.  For
+     * example BE's {@code AggSinkOperatorX::update_operator} picks
+     * {@code grouping_exprs} when {@code !_followed_by_shuffled_operator && 
!has_distinct},
+     * even though the child outputs a different (hash) distribution — and the 
LE inserted
+     * before the streaming preagg must partition by {@code grouping_exprs} so 
a local
+     * partial reduce actually collapses same-key rows.  Using the default 
(child
+     * distribution) here would scatter same-group rows across instances and 
degrade the
+     * preagg to a no-op, also breaking row-arrival order at downstream 
merge-finalize.
+     *
+     * @param childIndex which child
+     * @param followedByShuffled whether the chain at this node is followed by 
a shuffled
+     *        operator (mirrors BE's {@code _followed_by_shuffled_operator})
+     */
+    protected List<Expr> getLocalExchangeDistributeExprs(int childIndex, 
boolean followedByShuffled) {
+        return getChildDistributeExprList(childIndex);
+    }
+
     /**
      * Returns the operator's own semantically-defined partition expressions
      * (e.g. GROUP BY exprs for aggregation, PARTITION BY exprs for analytic).
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/LocalExchangePlannerTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/LocalExchangePlannerTest.java
index 52536492ef9..55b3c62b87c 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/LocalExchangePlannerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/LocalExchangePlannerTest.java
@@ -449,6 +449,43 @@ public class LocalExchangePlannerTest extends 
TestWithFeService implements PlanS
                                                 && 
!le.getDistributeExprLists().isEmpty()))));
     }
 
+    @Test
+    public void testStreamingAggHashShuffleUsesGroupingExprs() throws 
Exception {
+        // Regression for: FE-planned LE(LOCAL_HASH) under a streaming partial 
agg used
+        // the child's table distribution (e.g. `k1`) instead of the grouping 
keys
+        // (e.g. `k2`).  BE's 
AggSinkOperatorX/StreamingAggOperatorX::update_operator
+        // picks `_partition_exprs = grouping_exprs` when the chain is NOT 
followed by
+        // a shuffled operator (the common case where the streaming preagg 
sits at
+        // fragment root with only a cross-fragment HASH ExchangeSink above).  
Using
+        // child distribution instead scatters same-group rows across N 
instances,
+        // turning the partial preagg into a no-op and corrupting row-arrival 
order at
+        // downstream merge-finalize (manifests as e.g. non-deterministic
+        // group_concat / py_json_array_agg output).
+        //
+        // Table t1 is DISTRIBUTED BY HASH(k1).  GROUP BY k2 forces a 
cross-fragment
+        // exchange (and thus a two-phase aggregation): the streaming partial 
agg lives
+        // in the lower fragment, with an FE-inserted LE(LOCAL_HASH) below it. 
 The fix
+        // makes that LE carry [k2] (the grouping key) rather than [k1] (the 
child
+        // table distribution).
+        setupLocalShuffleSession(null);
+        assertPlanShape(
+                "select k2, count(*) from test.t1 group by k2",
+                anyTree(
+                        agg(
+                                localExchange(LOCAL_HASH,
+                                        localExchange(PT, olapScan("t1")))
+                                        .where(le -> {
+                                            
List<org.apache.doris.analysis.Expr> exprs =
+                                                    
le.getDistributeExprLists();
+                                            if (exprs == null || exprs.size() 
!= 1) {
+                                                return false;
+                                            }
+                                            org.apache.doris.analysis.Expr e = 
exprs.get(0);
+                                            return e instanceof 
org.apache.doris.analysis.SlotRef
+                                                    && 
"k2".equals(((org.apache.doris.analysis.SlotRef) e).getCol());
+                                        }))));
+    }
+
     @Test
     public void testRequireHashSatisfyAllHashShuffleTypes() {
         LocalExchangeNode.LocalExchangeTypeRequire requireHash = 
LocalExchangeNode.LocalExchangeTypeRequire.requireHash();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to