This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_repartition
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 9094b6be856a6054c35ea2b53a013a75857c6008
Author: yiguolei <[email protected]>
AuthorDate: Tue Mar 3 21:26:29 2026 +0800

    simplify agg code
---
 .../partitioned_aggregation_source_operator.cpp    | 26 ++++++++--------------
 .../exec/partitioned_aggregation_source_operator.h |  3 +--
 be/src/vec/spill/spill_file.h                      |  3 ---
 3 files changed, 10 insertions(+), 22 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 28eab411051..fd3f6c284be 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -271,9 +271,8 @@ Status 
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
 
     // Phase 2: Recover blocks from disk into _blocks (batch of ~8MB).
     if (local_state._blocks.empty() && 
local_state._current_partition.spill_file) {
-        bool has_data = false;
-        RETURN_IF_ERROR(local_state._recover_blocks_from_partition(
-                state, local_state._current_partition, has_data));
+        RETURN_IF_ERROR(
+                local_state._recover_blocks_from_partition(state, 
local_state._current_partition));
         // Return empty block to yield to pipeline scheduler.
         // Pipeline task will check memory and call revoke_memory if needed.
         *eos = false;
@@ -344,9 +343,7 @@ void PartitionedAggLocalState::_init_partition_queue() {
 }
 
 Status PartitionedAggLocalState::_recover_blocks_from_partition(RuntimeState* 
state,
-                                                                
AggSpillPartitionInfo& partition,
-                                                                bool& 
has_data) {
-    has_data = false;
+                                                                
AggSpillPartitionInfo& partition) {
     size_t accumulated_bytes = 0;
     if (!partition.spill_file || state->is_cancelled()) {
         return Status::OK();
@@ -369,7 +366,6 @@ Status 
PartitionedAggLocalState::_recover_blocks_from_partition(RuntimeState* st
         RETURN_IF_ERROR(_current_reader->read(&block, &eos));
 
         if (!block.empty()) {
-            has_data = true;
             accumulated_bytes += block.allocated_bytes();
             _blocks.emplace_back(std::move(block));
 
@@ -523,16 +519,12 @@ Status 
PartitionedAggLocalState::flush_and_repartition(RuntimeState* state) {
 
     // 4. Push non-empty sub-partitions into the work queue.
     for (int i = 0; i < static_cast<int>(p._partition_count); ++i) {
-        if (output_spill_files[i] && 
output_spill_files[i]->get_written_bytes() > 0) {
-            _partition_queue.emplace_back(std::move(output_spill_files[i]), 
new_level);
-            // Metrics
-            COUNTER_UPDATE(_total_partition_spills, 1);
-            if (new_level > _max_partition_level_seen) {
-                _max_partition_level_seen = new_level;
-                COUNTER_SET(_max_partition_level, 
int64_t(_max_partition_level_seen));
-            }
-        } else if (output_spill_files[i]) {
-            
ExecEnv::GetInstance()->spill_file_mgr()->delete_spill_file(output_spill_files[i]);
+        _partition_queue.emplace_back(std::move(output_spill_files[i]), 
new_level);
+        // Metrics
+        COUNTER_UPDATE(_total_partition_spills, 1);
+        if (new_level > _max_partition_level_seen) {
+            _max_partition_level_seen = new_level;
+            COUNTER_SET(_max_partition_level, 
int64_t(_max_partition_level_seen));
         }
     }
 
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index 2ce67cbf5f8..ff40cd411d4 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -89,8 +89,7 @@ private:
     /// Read up to vectorized::SpillFile::MAX_SPILL_WRITE_BATCH_MEM bytes from 
`partition.spill_files` into
     /// `_blocks`. Returns has_data=true if any blocks were read.
     /// Consumes and deletes exhausted spill files from the partition.
-    Status _recover_blocks_from_partition(RuntimeState* state, 
AggSpillPartitionInfo& partition,
-                                          bool& has_data);
+    Status _recover_blocks_from_partition(RuntimeState* state, 
AggSpillPartitionInfo& partition);
 
     // ── State ──────────────────────────────────────────────────────────
     std::unique_ptr<RuntimeState> _runtime_state;
diff --git a/be/src/vec/spill/spill_file.h b/be/src/vec/spill/spill_file.h
index 9cfbd210ddb..82b253d8d07 100644
--- a/be/src/vec/spill/spill_file.h
+++ b/be/src/vec/spill/spill_file.h
@@ -76,9 +76,6 @@ public:
 
     int64_t get_written_bytes() const { return _total_written_bytes; }
 
-    /// Returns true if any data has been written.
-    bool has_data() const { return _total_written_bytes > 0; }
-
     /// Returns true after the writer has been closed (all data flushed).
     bool ready_for_reading() const { return _ready_for_reading; }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to