This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_repartition
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_repartition by this push:
     new fdb355fd4fc simplify probe code enhancement probe operator
fdb355fd4fc is described below

commit fdb355fd4fc0c09add4e3df85bb14058ccf1ae86
Author: yiguolei <[email protected]>
AuthorDate: Tue Mar 3 14:22:26 2026 +0800

    simplify probe code enhancement probe operator
---
 .../partitioned_aggregation_source_operator.cpp    | 120 +++++-----
 .../exec/partitioned_aggregation_source_operator.h |   7 +-
 .../exec/partitioned_hash_join_probe_operator.cpp  | 241 +++++++--------------
 .../exec/partitioned_hash_join_probe_operator.h    |  42 +++-
 be/src/vec/spill/spill_file.h                      |   2 +-
 5 files changed, 163 insertions(+), 249 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 059ae72e369..28eab411051 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -177,7 +177,7 @@ bool PartitionedAggSourceOperatorX::is_shuffled_operator() 
const {
 
 size_t PartitionedAggSourceOperatorX::revocable_mem_size(RuntimeState* state) 
const {
     auto& local_state = get_local_state(state);
-    if (!local_state._shared_state->_is_spilled || 
!local_state._current_partition.has_data()) {
+    if (!local_state._shared_state->_is_spilled || 
!local_state._current_partition.spill_file) {
         return 0;
     }
 
@@ -214,9 +214,7 @@ Status 
PartitionedAggSourceOperatorX::revoke_memory(RuntimeState* state) {
                               
PrettyPrinter::print_bytes(local_state._estimate_memory_usage));
 
     // Flush hash table + repartition remaining spill files of the current 
partition.
-    RETURN_IF_ERROR(local_state.flush_and_repartition(state,
-                                                      
local_state._current_partition.spill_file,
-                                                      
local_state._current_partition.level));
+    RETURN_IF_ERROR(local_state.flush_and_repartition(state));
     local_state._current_partition = AggSpillPartitionInfo {};
     local_state._need_to_setup_partition = true;
     return Status::OK();
@@ -265,19 +263,17 @@ Status 
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
 
         VLOG_DEBUG << fmt::format(
                 "Query:{}, agg source:{}, task:{}, setup partition level:{}, "
-                "queue remaining:{}, partition bytes:{}",
+                "queue remaining:{}",
                 print_id(state->query_id()), node_id(), state->task_id(),
-                local_state._current_partition.level, 
local_state._partition_queue.size(),
-                
PrettyPrinter::print_bytes(local_state._current_partition.total_bytes()));
+                local_state._current_partition.level, 
local_state._partition_queue.size());
         local_state._need_to_setup_partition = false;
     }
 
     // Phase 2: Recover blocks from disk into _blocks (batch of ~8MB).
     if (local_state._blocks.empty() && 
local_state._current_partition.spill_file) {
         bool has_data = false;
-        status = local_state._recover_blocks_from_partition(state, 
local_state._current_partition,
-                                                            has_data);
-        RETURN_IF_ERROR(status);
+        RETURN_IF_ERROR(local_state._recover_blocks_from_partition(
+                state, local_state._current_partition, has_data));
         // Return empty block to yield to pipeline scheduler.
         // Pipeline task will check memory and call revoke_memory if needed.
         *eos = false;
@@ -309,8 +305,7 @@ Status 
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
     auto* runtime_state = local_state._runtime_state.get();
     
local_state._shared_state->_in_mem_shared_state->aggregate_data_container->init_once();
     bool inner_eos = false;
-    status = _agg_source_operator->get_block(runtime_state, block, &inner_eos);
-    RETURN_IF_ERROR(status);
+    RETURN_IF_ERROR(_agg_source_operator->get_block(runtime_state, block, 
&inner_eos));
 
     if (inner_eos) {
         auto* source_local_state =
@@ -318,8 +313,7 @@ Status 
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
         local_state.update_profile<true>(source_local_state->custom_profile());
 
         // Current partition fully output. Reset hash table, pop next 
partition.
-        status = _agg_source_operator->reset_hash_table(runtime_state);
-        RETURN_IF_ERROR(status);
+        RETURN_IF_ERROR(_agg_source_operator->reset_hash_table(runtime_state));
 
         local_state._current_partition = AggSpillPartitionInfo {};
         local_state._estimate_memory_usage = 0;
@@ -340,13 +334,11 @@ Status 
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
 
 void PartitionedAggLocalState::_init_partition_queue() {
     for (auto& spill_file : _shared_state->_spill_partitions) {
-        if (spill_file && spill_file->has_data()) {
-            _partition_queue.emplace_back(std::move(spill_file), /*level=*/0);
-            // Track metrics: each queued partition counts as one spill at 
level 0
-            COUNTER_UPDATE(_total_partition_spills, 1);
-            _max_partition_level_seen = 0;
-            COUNTER_SET(_max_partition_level, 
int64_t(_max_partition_level_seen));
-        }
+        _partition_queue.emplace_back(std::move(spill_file), /*level=*/0);
+        // Track metrics: each queued partition counts as one spill at level 0
+        COUNTER_UPDATE(_total_partition_spills, 1);
+        _max_partition_level_seen = 0;
+        COUNTER_SET(_max_partition_level, int64_t(_max_partition_level_seen));
     }
     _shared_state->_spill_partitions.clear();
 }
@@ -356,57 +348,42 @@ Status 
PartitionedAggLocalState::_recover_blocks_from_partition(RuntimeState* st
                                                                 bool& 
has_data) {
     has_data = false;
     size_t accumulated_bytes = 0;
+    if (!partition.spill_file || state->is_cancelled()) {
+        return Status::OK();
+    }
 
-    auto exception_catch_func = [&]() -> Status {
-        if (!partition.spill_file || state->is_cancelled()) {
-            return Status::OK();
-        }
-
-        // Create or reuse a persistent reader for this file
-        if (!_current_reader) {
-            _current_reader = partition.spill_file->create_reader(state, 
operator_profile());
-            RETURN_IF_ERROR(_current_reader->open());
-        }
+    // Create or reuse a persistent reader for this file
+    if (!_current_reader) {
+        _current_reader = partition.spill_file->create_reader(state, 
operator_profile());
+        RETURN_IF_ERROR(_current_reader->open());
+    }
 
-        bool eos = false;
+    bool eos = false;
 
-        while (!eos && !state->is_cancelled()) {
-            vectorized::Block block;
-            
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::recover_spill_data", {
-                return Status::Error<INTERNAL_ERROR>(
-                        "fault_inject partitioned_agg_source 
recover_spill_data failed");
-            });
-            RETURN_IF_ERROR(_current_reader->read(&block, &eos));
+    while (!eos && !state->is_cancelled()) {
+        vectorized::Block block;
+        
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::recover_spill_data", {
+            return Status::Error<INTERNAL_ERROR>(
+                    "fault_inject partitioned_agg_source recover_spill_data 
failed");
+        });
+        RETURN_IF_ERROR(_current_reader->read(&block, &eos));
 
-            if (!block.empty()) {
-                has_data = true;
-                accumulated_bytes += block.allocated_bytes();
-                _blocks.emplace_back(std::move(block));
+        if (!block.empty()) {
+            has_data = true;
+            accumulated_bytes += block.allocated_bytes();
+            _blocks.emplace_back(std::move(block));
 
-                if (accumulated_bytes >= state->spill_buffer_size_bytes()) {
-                    return Status::OK();
-                }
+            if (accumulated_bytes >= state->spill_buffer_size_bytes()) {
+                return Status::OK();
             }
         }
+    }
 
-        if (eos) {
-            _current_reader.reset();
-            partition.spill_file.reset();
-        }
-        return Status::OK();
-    };
-
-    DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::submit_func", {
-        return Status::Error<INTERNAL_ERROR>(
-                "fault_inject partitioned_agg_source submit_func failed");
-    });
-
-    auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return 
exception_catch_func(); }); }();
-    LOG_IF(WARNING, !status.ok()) << fmt::format(
-            "Query:{}, agg source:{}, task:{}, recover exception:{}", 
print_id(state->query_id()),
-            _parent->node_id(), state->task_id(), status.to_string());
-
-    return status;
+    if (eos) {
+        _current_reader.reset();
+        partition.spill_file.reset();
+    }
+    return Status::OK();
 }
 
 Status PartitionedAggLocalState::setup_in_memory_agg_op(RuntimeState* state) {
@@ -461,10 +438,9 @@ Status 
PartitionedAggLocalState::flush_hash_table_to_sub_spill_files(RuntimeStat
     return Status::OK();
 }
 
-Status PartitionedAggLocalState::flush_and_repartition(
-        RuntimeState* state, vectorized::SpillFileSPtr& remaining_spill_file, 
int level) {
+Status PartitionedAggLocalState::flush_and_repartition(RuntimeState* state) {
     auto& p = _parent->cast<PartitionedAggSourceOperatorX>();
-    const int new_level = level + 1;
+    const int new_level = _current_partition.level + 1;
 
     if (new_level >= p._repartition_max_depth) {
         return Status::InternalError(
@@ -476,7 +452,8 @@ Status PartitionedAggLocalState::flush_and_repartition(
     VLOG_DEBUG << fmt::format(
             "Query:{}, agg source:{}, task:{}, flush_and_repartition: "
             "flushing hash table and repartitioning remaining spill file at 
level {} -> {}",
-            print_id(state->query_id()), p.node_id(), state->task_id(), level, 
new_level);
+            print_id(state->query_id()), p.node_id(), state->task_id(), 
_current_partition.level,
+            new_level);
 
     {
         auto* source_local_state =
@@ -531,15 +508,16 @@ Status PartitionedAggLocalState::flush_and_repartition(
             RETURN_IF_ERROR(_repartitioner.repartition(state, _current_reader, 
&done));
         }
         // reader is reset by repartitioner on completion
-    } else if (remaining_spill_file) {
+    } else if (_current_partition.spill_file) {
         // No partial read — repartition the entire file from scratch.
         bool done = false;
         while (!done && !state->is_cancelled()) {
-            RETURN_IF_ERROR(_repartitioner.repartition(state, 
remaining_spill_file, &done));
+            RETURN_IF_ERROR(
+                    _repartitioner.repartition(state, 
_current_partition.spill_file, &done));
         }
     }
     _current_reader.reset();
-    remaining_spill_file.reset();
+    _current_partition.spill_file.reset();
 
     RETURN_IF_ERROR(_repartitioner.finalize());
 
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index 1647c2db19e..2ce67cbf5f8 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -47,10 +47,6 @@ struct AggSpillPartitionInfo {
     AggSpillPartitionInfo() = default;
     AggSpillPartitionInfo(vectorized::SpillFileSPtr s, int lvl)
             : spill_file(std::move(s)), level(lvl) {}
-
-    bool has_data() const { return spill_file && 
spill_file->get_written_bytes() > 0; }
-
-    int64_t total_bytes() const { return spill_file ? 
spill_file->get_written_bytes() : 0; }
 };
 
 class PartitionedAggLocalState MOCK_REMOVE(final)
@@ -81,8 +77,7 @@ public:
     /// unread spill files from `remaining_spill_files`, and push resulting 
sub-partitions into
     /// `_partition_queue`. After this call the hash table is reset and
     /// `remaining_spill_files` is cleared.
-    Status flush_and_repartition(RuntimeState* state,
-                                 vectorized::SpillFileSPtr& 
remaining_spill_file, int level);
+    Status flush_and_repartition(RuntimeState* state);
 
 private:
     friend class PartitionedAggSourceOperatorX;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 6db83c92638..f5949ab6436 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -220,19 +220,13 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
                 "fault_inject partitioned_hash_join_probe "
                 "spill_probe_blocks canceled");
     });
-    size_t not_revoked_size = 0;
     auto& p = _parent->cast<PartitionedHashJoinProbeOperatorX>();
     for (uint32_t partition_index = 0; partition_index != p._partition_count; 
++partition_index) {
         auto& blocks = _probe_blocks[partition_index];
         auto& partitioned_block = _partitioned_blocks[partition_index];
-        if (partitioned_block) {
-            const auto size = partitioned_block->allocated_bytes();
-            if (size >= vectorized::SpillFile::MIN_SPILL_WRITE_BATCH_MEM) {
-                blocks.emplace_back(partitioned_block->to_block());
-                partitioned_block.reset();
-            } else {
-                not_revoked_size += size;
-            }
+        if (partitioned_block && !partitioned_block->empty()) {
+            blocks.emplace_back(partitioned_block->to_block());
+            partitioned_block.reset();
         }
 
         if (blocks.empty()) {
@@ -272,8 +266,6 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
         }
     }
 
-    COUNTER_SET(_probe_blocks_bytes, int64_t(not_revoked_size));
-
     VLOG_DEBUG << fmt::format(
             "Query:{}, hash join probe:{}, task:{},"
             " spill_probe_blocks done",
@@ -309,15 +301,9 @@ bool PartitionedHashJoinProbeLocalState::is_blockable() 
const {
 }
 
 Status PartitionedHashJoinProbeLocalState::recover_build_blocks_from_partition(
-        RuntimeState* state, JoinSpillPartitionInfo& partition_info,
-        bool& recovered_data_available) {
-    // recovered_data_available signals the caller that this synchronous
-    // recovery call produced (or attempted to produce) at least one batch of
-    // recovered build data stored in _recovered_build_block. Callers should
-    // yield and consume the recovered data before continuing further
-    // processing for this partition.
-    recovered_data_available = false;
+        RuntimeState* state, JoinSpillPartitionInfo& partition_info) {
     if (!partition_info.build_file) {
+        // Build file is already exhausted for this partition.
         return Status::OK();
     }
     SCOPED_TIMER(_recovery_build_timer);
@@ -348,27 +334,20 @@ Status 
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_partition(
             RETURN_IF_ERROR(_recovered_build_block->merge(std::move(block)));
         }
         if (_recovered_build_block->allocated_bytes() >= 
state->spill_buffer_size_bytes()) {
-            recovered_data_available = true;
             return Status::OK(); // yield — buffer full, more data may remain
         }
     }
-    // File exhausted
+    // Build file fully consumed.
     RETURN_IF_ERROR(_current_build_reader->close());
     _current_build_reader.reset();
     partition_info.build_file.reset();
-    recovered_data_available = true;
     return Status::OK();
 }
 
 Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_partition(
-        RuntimeState* state, JoinSpillPartitionInfo& partition_info,
-        bool& recovered_data_available) {
-    // recovered_data_available: this call performs synchronous reads of probe
-    // blocks into _queue_probe_blocks up to a batch threshold and sets this
-    // flag to true when data is available. Caller should return/yield and
-    // then consume the recovered probe blocks before continuing.
-    recovered_data_available = false;
+        RuntimeState* state, JoinSpillPartitionInfo& partition_info) {
     if (!partition_info.probe_file) {
+        // Probe file is already exhausted for this partition.
         return Status::OK();
     }
 
@@ -391,15 +370,13 @@ Status 
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_partition(
             _queue_probe_blocks.emplace_back(std::move(block));
         }
         if (read_size >= state->spill_buffer_size_bytes()) {
-            recovered_data_available = true;
             return Status::OK(); // yield — enough data read
         }
     }
-    // File exhausted
+    // Probe file fully consumed.
     RETURN_IF_ERROR(_current_probe_reader->close());
     _current_probe_reader.reset();
     partition_info.probe_file.reset();
-    recovered_data_available = true;
     return Status::OK();
 }
 
@@ -454,17 +431,17 @@ Status 
PartitionedHashJoinProbeLocalState::repartition_current_partition(
             RETURN_IF_ERROR(_repartitioner.repartition(state, 
_current_build_reader, &done));
         }
         // reader is reset by repartitioner on completion
-    } else if (partition.build_file && 
partition.build_file->ready_for_reading()) {
+    } else if (partition.build_file) {
         // No partial read — repartition the entire file from scratch.
         bool done = false;
         while (!done && !state->is_cancelled()) {
             RETURN_IF_ERROR(_repartitioner.repartition(state, 
partition.build_file, &done));
         }
     }
+    RETURN_IF_ERROR(_repartitioner.finalize());
     _recovered_build_block.reset();
-    partition.build_file.reset();
     _current_build_reader.reset(); // clear any leftover reader state
-    RETURN_IF_ERROR(_repartitioner.finalize());
+    partition.build_file.reset();
 
     // Repartition probe files
     std::vector<vectorized::SpillFileSPtr> probe_output_spill_files;
@@ -481,19 +458,19 @@ Status 
PartitionedHashJoinProbeLocalState::repartition_current_partition(
 
         RETURN_IF_ERROR(_repartitioner.setup_output(state, 
probe_output_spill_files));
 
-        if (partition.probe_file->ready_for_reading()) {
-            bool done = false;
-            while (!done && !state->is_cancelled()) {
-                RETURN_IF_ERROR(_repartitioner.repartition(state, 
partition.probe_file, &done));
-            }
-            partition.probe_file.reset();
+        bool done = false;
+        while (!done && !state->is_cancelled()) {
+            RETURN_IF_ERROR(_repartitioner.repartition(state, 
partition.probe_file, &done));
         }
-        _current_probe_reader.reset();
+        partition.probe_file.reset();
+
         RETURN_IF_ERROR(_repartitioner.finalize());
+        _current_probe_reader.reset();
     }
 
     // Push all sub-partitions into work queue; build/probe emptiness is 
handled
-    // later during recovery.
+    // later during recovery.  New sub-partitions start with build_finished =
+    // probe_finished = false (via constructor).
     for (int i = 0; i < static_cast<int>(p._partition_count); ++i) {
         
_spill_partition_queue.emplace_back(std::move(build_output_spill_files[i]),
                                             
std::move(probe_output_spill_files[i]), new_level);
@@ -595,6 +572,7 @@ Status 
PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized::
         RETURN_IF_ERROR(partitioned_blocks[i]->add_rows(input_block, 
partition_indexes[i].data(),
                                                         
partition_indexes[i].data() + count));
 
+        // Has to add min batch size check, or there will be too many small 
blocks during read blocks from file and do probe phase.
         if (partitioned_blocks[i]->rows() > 0 &&
             (eos || partitioned_blocks[i]->allocated_bytes() >=
                             vectorized::SpillFile::MIN_SPILL_WRITE_BATCH_MEM)) 
{
@@ -669,6 +647,7 @@ Status 
PartitionedHashJoinProbeOperatorX::_setup_internal_operators_from_partiti
 
     RETURN_IF_ERROR(_inner_sink_operator->sink(
             local_state._shared_state->_inner_runtime_state.get(), &block, 
true));
+    local_state._current_partition.build_finished = true;
     VLOG_DEBUG << fmt::format(
             "Query:{}, hash join probe:{}, task:{},"
             " internal build from partition (level:{}) finished, rows:{}, 
memory usage:{}",
@@ -688,39 +667,19 @@ Status 
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
     // (including the original "level-0" ones) is accessed uniformly via the 
queue.
     if (!local_state._spill_queue_initialized) {
         DCHECK(local_state._child_eos) << "pull() with is_spilled=true called 
before child EOS";
+        // There maybe some blocks still in partitioned block or probe blocks. 
Flush them to disk.
+        RETURN_IF_ERROR(local_state.spill_probe_blocks(state));
+        // Close all probe writers so that SpillFile metadata (part_count, 
etc.)
+        // is finalized and the files become readable. Without this the readers
+        // would see _part_count == 0 and return no data.
+        for (auto& writer : local_state._probe_writers) {
+            if (writer) {
+                RETURN_IF_ERROR(writer->close());
+            }
+        }
         for (uint32_t i = 0; i < _partition_count; ++i) {
             auto& build_file = 
local_state._shared_state->_spilled_build_groups[i];
             auto& probe_file = local_state._probe_spilling_groups[i];
-            auto& probe_writer = local_state._probe_writers[i];
-
-            // Flush any remaining in-memory probe blocks for this partition.
-            auto& partitioned_block = local_state._partitioned_blocks[i];
-            if (partitioned_block && !partitioned_block->empty()) {
-                
local_state._probe_blocks[i].emplace_back(partitioned_block->to_block());
-                partitioned_block.reset();
-            }
-            for (auto& pb : local_state._probe_blocks[i]) {
-                if (pb.empty()) continue;
-                // Lazy-create SpillFile + Writer for this probe partition
-                if (!probe_writer) {
-                    auto relative_path = fmt::format(
-                            "{}/{}-{}-{}-{}", print_id(state->query_id()), 
"hash_probe", node_id(),
-                            state->task_id(), 
ExecEnv::GetInstance()->spill_file_mgr()->next_id());
-                    
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(
-                            relative_path, probe_file));
-                    RETURN_IF_ERROR(probe_file->create_writer(state, 
local_state.operator_profile(),
-                                                              probe_writer));
-                }
-                RETURN_IF_ERROR(probe_writer->write_block(state, pb));
-            }
-            local_state._probe_blocks[i].clear();
-
-            // Close probe writer to finalize probe SpillFile for reading.
-            if (probe_writer) {
-                RETURN_IF_ERROR(probe_writer->close());
-                probe_writer.reset();
-            }
-
             // Transfer SpillFiles into JoinSpillPartitionInfo unconditionally.
             
local_state._spill_partition_queue.emplace_back(std::move(build_file),
                                                             
std::move(probe_file), 0);
@@ -746,36 +705,23 @@ Status 
PartitionedHashJoinProbeOperatorX::_pull_from_spill_queue(
     *eos = false;
 
     if (local_state._need_to_setup_queue_partition) {
-        // If the queue is empty AND we don't already have a current
-        // partition or recovered build data pending, we're done. It's
-        // possible the queue was popped on a previous scheduling and the
-        // recovered data is waiting to be processed; in that case we must
-        // not return EOS here.
+        // No more partitions to process and no active partition — EOS.
         if (local_state._spill_partition_queue.empty() &&
-            local_state._current_partition.build_exhausted() &&
-            !(local_state._recovered_build_block &&
-              local_state._recovered_build_block->rows() > 0)) {
+            (!local_state._current_partition.is_valid() ||
+             local_state._current_partition.probe_finished)) {
             *eos = true;
             return Status::OK();
         }
 
-        // Pop next partition to process
-        // Invariant: we only pop a new queue partition when 
`_current_partition`
-        // is exhausted and we have not already recovered build data for the
-        // current partition. `build_file` being null is used as a sentinel
-        // for whether the partition's build data has been fully consumed.
-        // After reading all build data into `_recovered_build_block`, the
-        // file is reset. In that state we must NOT pop the next queued 
partition
-        // — we still need to setup internal operators from the recovered data.
-        if (local_state._current_partition.build_exhausted() &&
-            !local_state._recovered_build_block) {
-            DCHECK(!local_state._spill_partition_queue.empty());
-
+        // Pop next partition to process.
+        // Invariant: we only pop when there is no active current partition and
+        // no pending recovered build data waiting to be consumed.
+        if (!local_state._current_partition.is_valid() ||
+            local_state._current_partition.probe_finished) {
             local_state._current_partition = 
std::move(local_state._spill_partition_queue.front());
             local_state._spill_partition_queue.pop_front();
             local_state._recovered_build_block.reset();
             local_state._queue_probe_blocks.clear();
-
             VLOG_DEBUG << fmt::format(
                     "Query:{}, hash join probe:{}, task:{},"
                     " processing queue partition at level:{}, queue 
remaining:{}",
@@ -784,61 +730,31 @@ Status 
PartitionedHashJoinProbeOperatorX::_pull_from_spill_queue(
                     local_state._spill_partition_queue.size());
         }
 
-        // If we've already recovered build data into `_recovered_build_block`
-        // (from a previous scheduling), we must only perform the internal
-        // operator setup once we've recovered *all* build data for this
-        // partition. The recovery logic resets build_file when all data has
-        // been read. Only when build_file is null may we safely build
-        // the hash table from the accumulated `_recovered_build_block`.
-        //
-        // Note: if `_recovered_build_block` contains rows but build_file
-        // is non-null, more build data may still arrive; continue recovery
-        // instead of starting the build now.
-        if (local_state._recovered_build_block && 
local_state._recovered_build_block->rows() > 0 &&
-            local_state._current_partition.build_exhausted()) {
-            
RETURN_IF_ERROR(_setup_internal_operators_from_partition(local_state, state));
-            local_state._need_to_setup_queue_partition = false;
-        } else {
-            // Recover build data from the partition's build stream
-            bool recovered_data_available = false;
-            RETURN_IF_ERROR(local_state.recover_build_blocks_from_partition(
-                    state, local_state._current_partition, 
recovered_data_available));
-
-            if (local_state._current_partition.build_exhausted()) {
-                // Build stream is exhausted (possibly empty from the 
beginning),
-                // setup inner operators now and continue to probe phase.
-                
RETURN_IF_ERROR(_setup_internal_operators_from_partition(local_state, state));
-                local_state._need_to_setup_queue_partition = false;
-                return Status::OK();
-            }
-
-            if (recovered_data_available) {
-                // We read some build data — yield so it can be consumed in 
the next
-                // scheduling of this operator.
-                return Status::OK();
-            }
-
-            // All build data recovered. Yield once so the pipeline scheduler 
can
-            // re-evaluate reservations based on the recovered data before we
-            // actually build the hash table. On the next scheduling of this
-            // operator we will fall through and call
-            // `_setup_internal_operators_from_partition`.
-            return Status::OK();
+        // Continue recovering build data while there is unread build file.
+        if (local_state._current_partition.build_file) {
+            // Partially read build data — yield so it can be consumed
+            // before continuing recovery in the next scheduling slice.
+            return local_state.recover_build_blocks_from_partition(state,
+                                                                   
local_state._current_partition);
         }
+        RETURN_IF_ERROR(_setup_internal_operators_from_partition(local_state, 
state));
+        local_state._current_partition.build_finished = true;
+        local_state._need_to_setup_queue_partition = false;
+        return Status::OK();
     }
 
     // Probe phase: feed probe blocks from the current partition's probe stream
+    // into the inner probe operator.
     bool in_mem_eos = false;
     auto* runtime_state = 
local_state._shared_state->_inner_runtime_state.get();
     auto& probe_blocks = local_state._queue_probe_blocks;
 
     while (_inner_probe_operator->need_more_input_data(runtime_state)) {
         if (probe_blocks.empty()) {
-            bool recovered_data_available = false;
-            RETURN_IF_ERROR(local_state.recover_probe_blocks_from_partition(
-                    state, local_state._current_partition, 
recovered_data_available));
-            if (!recovered_data_available) {
-                // No more probe data — send eos to inner probe
+            // Try to recover more probe blocks. If the probe stream is
+            // finished (probe_file == nullptr) and no blocks are buffered,
+            // we send EOS to the inner probe operator.
+            if (!local_state._current_partition.probe_file) {
                 vectorized::Block block;
                 RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state, 
&block, true));
                 VLOG_DEBUG << fmt::format(
@@ -847,9 +763,12 @@ Status 
PartitionedHashJoinProbeOperatorX::_pull_from_spill_queue(
                         print_id(state->query_id()), node_id(), 
state->task_id(),
                         local_state._current_partition.level);
                 break;
-            } else {
-                return Status::OK();
             }
+
+            // Probe data recovered — yield to let the pipeline scheduler
+            // re-schedule us so we can push the recovered blocks.
+            return local_state.recover_probe_blocks_from_partition(state,
+                                                                   
local_state._current_partition);
         }
 
         auto block = std::move(probe_blocks.back());
@@ -858,9 +777,7 @@ Status 
PartitionedHashJoinProbeOperatorX::_pull_from_spill_queue(
             RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state, &block, 
false));
         }
     }
-
     RETURN_IF_ERROR(_inner_probe_operator->pull(runtime_state, output_block, 
&in_mem_eos));
-
     if (in_mem_eos) {
         VLOG_DEBUG << fmt::format(
                 "Query:{}, hash join probe:{}, task:{},"
@@ -868,8 +785,10 @@ Status 
PartitionedHashJoinProbeOperatorX::_pull_from_spill_queue(
                 print_id(state->query_id()), node_id(), state->task_id(),
                 local_state._current_partition.level);
         local_state.update_profile_from_inner();
+        local_state._current_partition.probe_finished = true;
 
-        // Reset for next queue entry
+        // Reset for next queue entry — default-constructed partition has
+        // is_valid() == false, signaling "no partition in progress".
         local_state._current_partition = JoinSpillPartitionInfo {};
         local_state._need_to_setup_queue_partition = true;
         local_state._queue_probe_blocks.clear();
@@ -900,6 +819,15 @@ bool 
PartitionedHashJoinProbeOperatorX::need_more_input_data(RuntimeState* state
 // This is the memory that can be freed if we choose to revoke and repartition 
the current
 size_t PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* 
state) const {
     auto& local_state = get_local_state(state);
+    if (!local_state._shared_state->_is_spilled) {
+        return 0;
+    }
+    if (!local_state._current_partition.is_valid() ||
+        local_state._current_partition.build_finished) {
+        // No active partition — no revocable memory.
+        // Or if current partition has finished build hash table.
+        return 0;
+    }
     size_t mem_size = 0;
     auto& probe_blocks = local_state._probe_blocks;
     for (uint32_t i = 0; i < _partition_count; ++i) {
@@ -945,13 +873,8 @@ size_t 
PartitionedHashJoinProbeOperatorX::get_reserve_mem_size(RuntimeState* sta
     // Baseline reservation is one block of spill I/O.
     size_t size_to_reserve = state->minimum_operator_memory_required_bytes();
 
-    // Queue path: _need_to_setup_queue_partition is true AND recovered build
-    // data is fully available (build_file exhausted), meaning the next pull()
-    // will call _setup_internal_operators_from_partition.
-    const bool about_to_build =
-            local_state._need_to_setup_queue_partition && 
local_state._recovered_build_block &&
-            local_state._recovered_build_block->rows() > 0 &&
-            local_state._current_partition.build_exhausted(); // file 
exhausted → ready to build
+    const bool about_to_build = local_state._current_partition.is_valid() &&
+                                !local_state._current_partition.build_finished;
 
     if (about_to_build) {
         // Estimate rows that will land in the hash table so we can reserve
@@ -1028,7 +951,10 @@ Status 
PartitionedHashJoinProbeOperatorX::revoke_memory(RuntimeState* state) {
         // Probe-data accumulation phase: spill in-memory probe blocks to disk.
         return local_state.spill_probe_blocks(state);
     }
-
+    if (!local_state._current_partition.is_valid() ||
+        local_state._current_partition.build_finished) {
+        return Status::OK();
+    }
     // Recovery/build phase: repartition the current partition's in-memory 
build
     // data so the hash table build can be deferred to a smaller sub-partition.
     return local_state.revoke_build_data(state);
@@ -1039,17 +965,6 @@ Status 
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
     *eos = false;
     auto& local_state = get_local_state(state);
     const auto is_spilled = local_state._shared_state->_is_spilled;
-#ifndef NDEBUG
-    Defer eos_check_defer([&] {
-        if (*eos) {
-            LOG(INFO) << fmt::format(
-                    "Query:{}, hash join probe:{}, task:{}, child eos:{}, need 
spill:{}",
-                    print_id(state->query_id()), node_id(), state->task_id(),
-                    local_state._child_eos, is_spilled);
-        }
-    });
-#endif
-
     Defer defer([&]() {
         COUNTER_SET(local_state._memory_usage_reserved,
                     int64_t(local_state.estimate_memory_usage()));
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index b911b5c6983..a375fe1d1f7 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -45,18 +45,46 @@ class PartitionedHashJoinProbeOperatorX;
 /// during recovery. For multi-level spill, when a partition is too large to 
fit in
 /// memory, it gets repartitioned into FANOUT sub-partitions, each represented 
by a
 /// new JoinSpillPartitionInfo at level + 1.
+///
+/// Lifecycle of partition progress:
+///   build_file == nullptr:
+///     - all build-side spill data has been read from disk for this partition
+///   probe_file == nullptr:
+///     - all probe-side spill data has been read from disk for this partition
+///   build_finished = true:
+///     - build side has completed hash table construction
+///   probe_finished = true:
+///     - probe side has completed probing all rows for this partition
+///
+/// A default-constructed instance has is_valid() == false, representing "no 
partition".
+/// New sub-partitions created by repartitioning start with both flags = false 
and
+/// initialized = true.
 struct JoinSpillPartitionInfo {
+    // build_file == nullptr means all build data has been read from disk.
     vectorized::SpillFileSPtr build_file;
+    // probe_file == nullptr means all probe data has been read from disk.
     vectorized::SpillFileSPtr probe_file;
     int level = 0; // 0 = original level-0 partition, 1+ = repartitioned 
sub-partition
 
+    // Read all build data from disk and finished building the hash table.
+    bool build_finished = false;
+    // Read all probe data from disk and probed all rows against the hash 
table.
+    bool probe_finished = false;
+    // Whether this struct currently represents an active queue partition.
+    bool initialized = false;
+
     JoinSpillPartitionInfo() = default;
     JoinSpillPartitionInfo(vectorized::SpillFileSPtr build, 
vectorized::SpillFileSPtr probe,
                            int lvl)
-            : build_file(std::move(build)), probe_file(std::move(probe)), 
level(lvl) {}
-
-    bool build_exhausted() const { return !build_file; }
-    bool probe_exhausted() const { return !probe_file; }
+            : build_file(std::move(build)),
+              probe_file(std::move(probe)),
+              level(lvl),
+              initialized(true) {}
+
+    /// Returns true if this struct currently represents an active partition 
entry
+    /// from the spill queue. A default-constructed partition is "invalid" and
+    /// serves as a sentinel meaning "no partition is being processed".
+    bool is_valid() const { return initialized; }
 };
 
 class PartitionedHashJoinProbeLocalState MOCK_REMOVE(final)
@@ -85,12 +113,10 @@ public:
 
     /// Recover build blocks from a JoinSpillPartitionInfo's build stream (for 
multi-level recovery).
     Status recover_build_blocks_from_partition(RuntimeState* state,
-                                               JoinSpillPartitionInfo& 
partition_info,
-                                               bool& recovered_data_available);
+                                               JoinSpillPartitionInfo& 
partition_info);
     /// Recover probe blocks from a JoinSpillPartitionInfo's probe stream (for 
multi-level recovery).
     Status recover_probe_blocks_from_partition(RuntimeState* state,
-                                               JoinSpillPartitionInfo& 
partition_info,
-                                               bool& recovered_data_available);
+                                               JoinSpillPartitionInfo& 
partition_info);
 
     /// Repartition the current partition's build and probe streams into 
FANOUT sub-partitions
     /// and push them into _spill_partition_queue for subsequent processing.
diff --git a/be/src/vec/spill/spill_file.h b/be/src/vec/spill/spill_file.h
index bfce76d1031..9cfbd210ddb 100644
--- a/be/src/vec/spill/spill_file.h
+++ b/be/src/vec/spill/spill_file.h
@@ -58,7 +58,7 @@ using SpillFileReaderUPtr = std::unique_ptr<SpillFileReader>;
 class SpillFile {
 public:
     // to avoid too many small file writes
-    static constexpr size_t MIN_SPILL_WRITE_BATCH_MEM = 32 * 1024;
+    static constexpr size_t MIN_SPILL_WRITE_BATCH_MEM = 512 * 1024;
     static constexpr size_t MAX_SPILL_WRITE_BATCH_MEM = 32 * 1024 * 1024;
 
     /// @param data_dir       The spill storage directory (disk) selected by 
SpillFileManager.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to