This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch fe_local_shuffle
in repository https://gitbox.apache.org/repos/asf/doris.git

commit ccf7874857b18d7f14897fe82c7d7278995c32bd
Author: 924060929 <[email protected]>
AuthorDate: Sun Mar 29 10:26:21 2026 +0800

    [fix](local shuffle) restore deferred exchanger num_tasks raise for 
non-scan serial operators
    
    Restore the upstream pipeline num_tasks raise in 
_create_deferred_local_exchangers
    that was removed in 80541b0dd93.  The raise is needed because serial 
non-scan
    operators (e.g., UNPARTITIONED Exchange) reduce the pipeline's num_tasks to
    _parallel_instances (typically 1), but downstream operators (Agg, Sort, 
Union,
    Join build sinks) need _num_instances tasks so every fragment instance can
    create and inject shared state.  Without the raise, instances 1+ fail with
    "must set shared state, in 
AGGREGATION_OPERATOR/SORT_OPERATOR/UNION_OPERATOR".
    
    The raise is now conditional: skip when the serial operator is a scan source
    (pooling scan), where 1 sender is correct and PassthroughExchanger(1, N)
    handles the fan-out properly.  This avoids the profile count inflation that
    motivated the original removal.
---
 be/src/exec/pipeline/pipeline_fragment_context.cpp | 27 ++++++++++++++++++----
 1 file changed, 23 insertions(+), 4 deletions(-)

diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp 
b/be/src/exec/pipeline/pipeline_fragment_context.cpp
index 29a9a04c477..148422b61ef 100644
--- a/be/src/exec/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp
@@ -668,10 +668,29 @@ Status 
PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip
 
 Status PipelineFragmentContext::_create_deferred_local_exchangers() {
     for (auto& info : _deferred_exchangers) {
-        // upstream_pipe->num_tasks() is already the correct sender count:
-        // _create_tree_helper sets it via add_operator (serial operators call 
set_num_tasks with
-        // _parallel_instances, non-serial operators leave it at the value 
inherited from
-        // add_pipeline).  No adjustment is needed here.
+        // Raise upstream pipeline's num_tasks to _num_instances when a serial
+        // non-scan operator (e.g., UNPARTITIONED Exchange) reduced it below
+        // _num_instances.  This is needed because downstream pipelines
+        // (join build, agg, sort, union, etc.) need _num_instances tasks so
+        // every fragment instance can create/inject shared state.  Without the
+        // raise, instances 1+ have no upstream task and operators fail with
+        // "must set shared state".
+        //
+        // Exception: do NOT raise when the serial operator is a scan source
+        // (pooling scan).  For pooling scan, 1 sender is correct —
+        // PassthroughExchanger(1, N) handles the 1→N fan-out properly.
+        if (info.upstream_pipe->num_tasks() < _num_instances) {
+            bool has_serial_scan = false;
+            for (auto& op : info.upstream_pipe->operators()) {
+                if (op->is_serial_operator() && op->is_source()) {
+                    has_serial_scan = true;
+                    break;
+                }
+            }
+            if (!has_serial_scan) {
+                info.upstream_pipe->set_num_tasks(_num_instances);
+            }
+        }
         const int sender_count = info.upstream_pipe->num_tasks();
         switch (info.partition_type) {
         case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to