This is an automated email from the ASF dual-hosted git repository.

924060929 pushed a commit to branch fe_local_shuffle_rebase_wip
in repository https://gitbox.apache.org/repos/asf/doris.git

commit a2760874f8c7edd6ea7598663f82d5850f82566b
Author: 924060929 <[email protected]>
AuthorDate: Wed May 20 17:23:52 2026 +0800

    [revert](local shuffle) Roll back the sender_count std::max patch
    
    The previous commit ef4ea66a66f raised _create_deferred_local_exchangers'
    sender_count to std::max(upstream_pipe->num_tasks(), _num_instances) on
    the theory that shared_state is shared across all _num_instances fragment
    instances and each contributes one close.
    
    That breaks the inverse case: when upstream_pipe legitimately has 1 task
    (e.g. PASSTHROUGH below a serial scan inside a single fragment instance),
    only 1 close actually happens, so the raised initial count never reaches
    zero — downstream sources block on SHUFFLE_DATA_DEPENDENCY forever.
    Surfaced by mtmv_up_down_job_p0.load.test_upgrade_downgrade_prepare_job_mtmv
    (MV refresh hung at RUNNING in build 949402): plan dump shows LE id=5
    PASSTHROUGH with _num_senders=6, _running_sink_operators=5 — only one
    close arrived, exchanger never finalized.
    
    Restore the original sender_count = info.upstream_pipe->num_tasks().
    The dictionary-refresh "stop grace fail" diagnosis from the previous
    commit was wrong about the close-count math; that hang has a different
    root cause and needs separate investigation.
---
 be/src/exec/pipeline/pipeline_fragment_context.cpp | 14 +-------------
 1 file changed, 1 insertion(+), 13 deletions(-)

diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp 
b/be/src/exec/pipeline/pipeline_fragment_context.cpp
index fac2649b2c6..1cbc06f5e2d 100644
--- a/be/src/exec/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp
@@ -718,19 +718,7 @@ Status 
PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const Descrip
 
 Status PipelineFragmentContext::_create_deferred_local_exchangers() {
     for (auto& info : _deferred_exchangers) {
-        // sender_count = total number of sink instances that will call 
sub_running_sink_operators
-        // on this exchanger's shared_state.  shared_state is shared across 
all `_num_instances`
-        // fragment instances on this BE — each instance contributes 
`upstream_pipe->num_tasks()`
-        // sink tasks.  When the upstream pipeline has a serial source (e.g. 
POOLING OlapScan,
-        // serial Exchange), `num_tasks()` stays at 1 and 
`_propagate_local_exchange_num_tasks`
-        // Pass 1 deliberately does not raise it, so the close-count is 
`_num_instances`, not 1.
-        // Without the max(_, _num_instances), sub_running_sink_operators 
decrements past zero,
-        // the exchanger never sees `_running_sink_operators == 0`, downstream 
sources block
-        // forever on SHUFFLE_DATA_DEPENDENCY, and the query hangs — 
eventually triggering a
-        // mem-tracker leak FATAL on the leftover blocks the exchanger still 
holds.
-        // Mirrors BE-planned `_add_local_exchange_impl` (line ~1023) which 
already uses
-        // `std::max(cur_pipe->num_tasks(), _num_instances)`.
-        const int sender_count = std::max(info.upstream_pipe->num_tasks(), 
_num_instances);
+        const int sender_count = info.upstream_pipe->num_tasks();
         switch (info.partition_type) {
         case TLocalPartitionType::LOCAL_EXECUTION_HASH_SHUFFLE:
         case TLocalPartitionType::GLOBAL_EXECUTION_HASH_SHUFFLE:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to