This is an automated email from the ASF dual-hosted git repository. huajianlan pushed a commit to branch fe_local_shuffle in repository https://gitbox.apache.org/repos/asf/doris.git
commit ccf7874857b18d7f14897fe82c7d7278995c32bd Author: 924060929 <[email protected]> AuthorDate: Sun Mar 29 10:26:21 2026 +0800 [fix](local shuffle) restore deferred exchanger num_tasks raise for non-scan serial operators Restore the upstream pipeline num_tasks raise in _create_deferred_local_exchangers that was removed in 80541b0dd93. The raise is needed because serial non-scan operators (e.g., UNPARTITIONED Exchange) reduce the pipeline's num_tasks to _parallel_instances (typically 1), but downstream operators (Agg, Sort, Union, Join build sinks) need _num_instances tasks so every fragment instance can create and inject shared state. Without the raise, instances 1+ fail with "must set shared state, in AGGREGATION_OPERATOR/SORT_OPERATOR/UNION_OPERATOR". The raise is now conditional: skip when the serial operator is a scan source (pooling scan), where 1 sender is correct and PassthroughExchanger(1, N) handles the fan-out properly. This avoids the profile count inflation that motivated the original removal. --- be/src/exec/pipeline/pipeline_fragment_context.cpp | 27 ++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp b/be/src/exec/pipeline/pipeline_fragment_context.cpp index 29a9a04c477..148422b61ef 100644 --- a/be/src/exec/pipeline/pipeline_fragment_context.cpp +++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp @@ -668,10 +668,29 @@ Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip Status PipelineFragmentContext::_create_deferred_local_exchangers() { for (auto& info : _deferred_exchangers) { - // upstream_pipe->num_tasks() is already the correct sender count: - // _create_tree_helper sets it via add_operator (serial operators call set_num_tasks with - // _parallel_instances, non-serial operators leave it at the value inherited from - // add_pipeline). No adjustment is needed here. + // Raise upstream pipeline's num_tasks to _num_instances when a serial + // non-scan operator (e.g., UNPARTITIONED Exchange) reduced it below + // _num_instances. This is needed because downstream pipelines + // (join build, agg, sort, union, etc.) need _num_instances tasks so + // every fragment instance can create/inject shared state. Without the + // raise, instances 1+ have no upstream task and operators fail with + // "must set shared state". + // + // Exception: do NOT raise when the serial operator is a scan source + // (pooling scan). For pooling scan, 1 sender is correct — + // PassthroughExchanger(1, N) handles the 1→N fan-out properly. + if (info.upstream_pipe->num_tasks() < _num_instances) { + bool has_serial_scan = false; + for (auto& op : info.upstream_pipe->operators()) { + if (op->is_serial_operator() && op->is_source()) { + has_serial_scan = true; + break; + } + } + if (!has_serial_scan) { + info.upstream_pipe->set_num_tasks(_num_instances); + } + } const int sender_count = info.upstream_pipe->num_tasks(); switch (info.partition_type) { case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE: --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
