This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_repartition in repository https://gitbox.apache.org/repos/asf/doris.git
commit 9094b6be856a6054c35ea2b53a013a75857c6008 Author: yiguolei <[email protected]> AuthorDate: Tue Mar 3 21:26:29 2026 +0800 simplify agg code --- .../partitioned_aggregation_source_operator.cpp | 26 ++++++++-------------- .../exec/partitioned_aggregation_source_operator.h | 3 +-- be/src/vec/spill/spill_file.h | 3 --- 3 files changed, 10 insertions(+), 22 deletions(-) diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index 28eab411051..fd3f6c284be 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -271,9 +271,8 @@ Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized: // Phase 2: Recover blocks from disk into _blocks (batch of ~8MB). if (local_state._blocks.empty() && local_state._current_partition.spill_file) { - bool has_data = false; - RETURN_IF_ERROR(local_state._recover_blocks_from_partition( - state, local_state._current_partition, has_data)); + RETURN_IF_ERROR( + local_state._recover_blocks_from_partition(state, local_state._current_partition)); // Return empty block to yield to pipeline scheduler. // Pipeline task will check memory and call revoke_memory if needed. *eos = false; @@ -344,9 +343,7 @@ void PartitionedAggLocalState::_init_partition_queue() { } Status PartitionedAggLocalState::_recover_blocks_from_partition(RuntimeState* state, - AggSpillPartitionInfo& partition, - bool& has_data) { - has_data = false; + AggSpillPartitionInfo& partition) { size_t accumulated_bytes = 0; if (!partition.spill_file || state->is_cancelled()) { return Status::OK(); @@ -369,7 +366,6 @@ Status PartitionedAggLocalState::_recover_blocks_from_partition(RuntimeState* st RETURN_IF_ERROR(_current_reader->read(&block, &eos)); if (!block.empty()) { - has_data = true; accumulated_bytes += block.allocated_bytes(); _blocks.emplace_back(std::move(block)); @@ -523,16 +519,12 @@ Status PartitionedAggLocalState::flush_and_repartition(RuntimeState* state) { // 4. Push non-empty sub-partitions into the work queue. for (int i = 0; i < static_cast<int>(p._partition_count); ++i) { - if (output_spill_files[i] && output_spill_files[i]->get_written_bytes() > 0) { - _partition_queue.emplace_back(std::move(output_spill_files[i]), new_level); - // Metrics - COUNTER_UPDATE(_total_partition_spills, 1); - if (new_level > _max_partition_level_seen) { - _max_partition_level_seen = new_level; - COUNTER_SET(_max_partition_level, int64_t(_max_partition_level_seen)); - } - } else if (output_spill_files[i]) { - ExecEnv::GetInstance()->spill_file_mgr()->delete_spill_file(output_spill_files[i]); + _partition_queue.emplace_back(std::move(output_spill_files[i]), new_level); + // Metrics + COUNTER_UPDATE(_total_partition_spills, 1); + if (new_level > _max_partition_level_seen) { + _max_partition_level_seen = new_level; + COUNTER_SET(_max_partition_level, int64_t(_max_partition_level_seen)); } } diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h index 2ce67cbf5f8..ff40cd411d4 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h @@ -89,8 +89,7 @@ private: /// Read up to vectorized::SpillFile::MAX_SPILL_WRITE_BATCH_MEM bytes from `partition.spill_files` into /// `_blocks`. Returns has_data=true if any blocks were read. /// Consumes and deletes exhausted spill files from the partition. - Status _recover_blocks_from_partition(RuntimeState* state, AggSpillPartitionInfo& partition, - bool& has_data); + Status _recover_blocks_from_partition(RuntimeState* state, AggSpillPartitionInfo& partition); // ── State ────────────────────────────────────────────────────────── std::unique_ptr<RuntimeState> _runtime_state; diff --git a/be/src/vec/spill/spill_file.h b/be/src/vec/spill/spill_file.h index 9cfbd210ddb..82b253d8d07 100644 --- a/be/src/vec/spill/spill_file.h +++ b/be/src/vec/spill/spill_file.h @@ -76,9 +76,6 @@ public: int64_t get_written_bytes() const { return _total_written_bytes; } - /// Returns true if any data has been written. - bool has_data() const { return _total_written_bytes > 0; } - /// Returns true after the writer has been closed (all data flushed). bool ready_for_reading() const { return _ready_for_reading; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
