This is an automated email from the ASF dual-hosted git repository. 924060929 pushed a commit to branch fe_local_shuffle_rebase in repository https://gitbox.apache.org/repos/asf/doris.git
commit f5c1a6c989768c23acd96e15a604e684438aeecc Author: 924060929 <[email protected]> AuthorDate: Wed May 20 17:23:52 2026 +0800 [revert](local shuffle) Roll back the sender_count std::max patch The previous commit ef4ea66a66f raised _create_deferred_local_exchangers' sender_count to std::max(upstream_pipe->num_tasks(), _num_instances) on the theory that shared_state is shared across all _num_instances fragment instances and each contributes one close. That breaks the inverse case: when upstream_pipe legitimately has 1 task (e.g. PASSTHROUGH below a serial scan inside a single fragment instance), only 1 close actually happens, so the raised initial count never reaches zero — downstream sources block on SHUFFLE_DATA_DEPENDENCY forever. Surfaced by mtmv_up_down_job_p0.load.test_upgrade_downgrade_prepare_job_mtmv (MV refresh hung at RUNNING in build 949402): plan dump shows LE id=5 PASSTHROUGH with _num_senders=6, _running_sink_operators=5 — only one close arrived, exchanger never finalized. Restore the original sender_count = info.upstream_pipe->num_tasks(). The dictionary-refresh "stop grace fail" diagnosis from the previous commit was wrong about the close-count math; that hang has a different root cause and needs separate investigation. --- be/src/exec/pipeline/pipeline_fragment_context.cpp | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp b/be/src/exec/pipeline/pipeline_fragment_context.cpp index fac2649b2c6..1cbc06f5e2d 100644 --- a/be/src/exec/pipeline/pipeline_fragment_context.cpp +++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp @@ -718,19 +718,7 @@ Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip Status PipelineFragmentContext::_create_deferred_local_exchangers() { for (auto& info : _deferred_exchangers) { - // sender_count = total number of sink instances that will call sub_running_sink_operators - // on this exchanger's shared_state. shared_state is shared across all `_num_instances` - // fragment instances on this BE — each instance contributes `upstream_pipe->num_tasks()` - // sink tasks. When the upstream pipeline has a serial source (e.g. POOLING OlapScan, - // serial Exchange), `num_tasks()` stays at 1 and `_propagate_local_exchange_num_tasks` - // Pass 1 deliberately does not raise it, so the close-count is `_num_instances`, not 1. - // Without the max(_, _num_instances), sub_running_sink_operators decrements past zero, - // the exchanger never sees `_running_sink_operators == 0`, downstream sources block - // forever on SHUFFLE_DATA_DEPENDENCY, and the query hangs — eventually triggering a - // mem-tracker leak FATAL on the leftover blocks the exchanger still holds. - // Mirrors BE-planned `_add_local_exchange_impl` (line ~1023) which already uses - // `std::max(cur_pipe->num_tasks(), _num_instances)`. - const int sender_count = std::max(info.upstream_pipe->num_tasks(), _num_instances); + const int sender_count = info.upstream_pipe->num_tasks(); switch (info.partition_type) { case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE: case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE: --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
