This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_repartition
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 3d35204aae3bc7a35d0394b12f70254729fad659
Author: yiguolei <[email protected]>
AuthorDate: Wed Mar 4 15:11:56 2026 +0800

    refactor code and add unit test
---
 .../exec/partitioned_aggregation_sink_operator.cpp |   89 +-
 .../exec/partitioned_aggregation_sink_operator.h   |   33 +-
 .../partitioned_aggregation_source_operator.cpp    |   20 +-
 .../exec/partitioned_aggregation_source_operator.h |   20 +-
 be/test/vec/spill/spill_file_test.cpp              | 1040 ++++++++++++++++++++
 5 files changed, 1122 insertions(+), 80 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index fd785f78335..c2e36dba2b8 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -51,15 +51,15 @@ Status 
PartitionedAggSinkLocalState::init(doris::RuntimeState* state,
 
     auto& parent = Base::_parent->template cast<Parent>();
     _spill_writers.resize(parent._partition_count);
-    RETURN_IF_ERROR(setup_in_memory_agg_op(state));
+    RETURN_IF_ERROR(_setup_in_memory_agg_op(state));
 
     for (const auto& probe_expr_ctx : 
Base::_shared_state->_in_mem_shared_state->probe_expr_ctxs) {
-        
key_columns_.emplace_back(probe_expr_ctx->root()->data_type()->create_column());
+        
_key_columns.emplace_back(probe_expr_ctx->root()->data_type()->create_column());
     }
     for (const auto& aggregate_evaluator :
          Base::_shared_state->_in_mem_shared_state->aggregate_evaluators) {
-        
value_data_types_.emplace_back(aggregate_evaluator->function()->get_serialized_type());
-        
value_columns_.emplace_back(aggregate_evaluator->function()->create_serialize_column());
+        
_value_data_types.emplace_back(aggregate_evaluator->function()->get_serialized_type());
+        
_value_columns.emplace_back(aggregate_evaluator->function()->create_serialize_column());
     }
     _rows_in_partitions.assign(parent._partition_count, 0);
     return Status::OK();
@@ -102,7 +102,7 @@ void PartitionedAggSinkLocalState::_init_counters() {
     update_profile_from_inner_profile<spilled>(name, custom_profile(), 
child_profile)
 
 template <bool spilled>
-void PartitionedAggSinkLocalState::update_profile(RuntimeProfile* 
child_profile) {
+void PartitionedAggSinkLocalState::_update_profile(RuntimeProfile* 
child_profile) {
     UPDATE_PROFILE("MemoryUsageHashTable");
     UPDATE_PROFILE("MemoryUsageSerializeKeyArena");
     UPDATE_PROFILE("BuildTime");
@@ -159,7 +159,7 @@ Status 
PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized:
         }
     } else {
         auto* sink_local_state = 
local_state._runtime_state->get_sink_local_state();
-        local_state.update_profile<false>(sink_local_state->custom_profile());
+        local_state._update_profile<false>(sink_local_state->custom_profile());
     }
 
     // finally perform EOS bookkeeping
@@ -190,7 +190,7 @@ Status 
PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized:
 
 Status PartitionedAggSinkOperatorX::revoke_memory(RuntimeState* state) {
     auto& local_state = get_local_state(state);
-    return local_state.revoke_memory(state);
+    return local_state._revoke_memory(state);
 }
 
 size_t PartitionedAggSinkOperatorX::revocable_mem_size(RuntimeState* state) 
const {
@@ -204,7 +204,7 @@ size_t 
PartitionedAggSinkOperatorX::revocable_mem_size(RuntimeState* state) cons
     return size > state->spill_min_revocable_mem() ? size : 0;
 }
 
-Status PartitionedAggSinkLocalState::setup_in_memory_agg_op(RuntimeState* 
state) {
+Status PartitionedAggSinkLocalState::_setup_in_memory_agg_op(RuntimeState* 
state) {
     _runtime_state = RuntimeState::create_unique(
             state->fragment_instance_id(), state->query_id(), 
state->fragment_id(),
             state->query_options(), TQueryGlobals {}, state->exec_env(), 
state->get_query_ctx());
@@ -244,18 +244,19 @@ size_t 
PartitionedAggSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bo
 }
 
 template <typename HashTableCtxType, typename KeyType>
-Status PartitionedAggSinkLocalState::to_block(HashTableCtxType& context, 
std::vector<KeyType>& keys,
-                                              
std::vector<vectorized::AggregateDataPtr>& values,
-                                              const 
vectorized::AggregateDataPtr null_key_data) {
+Status PartitionedAggSinkLocalState::_to_block(HashTableCtxType& context,
+                                               std::vector<KeyType>& keys,
+                                               
std::vector<vectorized::AggregateDataPtr>& values,
+                                               const 
vectorized::AggregateDataPtr null_key_data) {
     SCOPED_TIMER(_spill_serialize_hash_table_timer);
-    context.insert_keys_into_columns(keys, key_columns_, 
(uint32_t)keys.size());
+    context.insert_keys_into_columns(keys, _key_columns, 
(uint32_t)keys.size());
 
     if (null_key_data) {
         // only one key of group by support wrap null key
         // here need additional processing logic on the null key / value
-        CHECK(key_columns_.size() == 1);
-        CHECK(key_columns_[0]->is_nullable());
-        key_columns_[0]->insert_data(nullptr, 0);
+        CHECK(_key_columns.size() == 1);
+        CHECK(_key_columns[0]->is_nullable());
+        _key_columns[0]->insert_data(nullptr, 0);
 
         values.emplace_back(null_key_data);
     }
@@ -267,33 +268,33 @@ Status 
PartitionedAggSinkLocalState::to_block(HashTableCtxType& context, std::ve
                 ->serialize_to_column(
                         values,
                         
Base::_shared_state->_in_mem_shared_state->offsets_of_aggregate_states[i],
-                        value_columns_[i], values.size());
+                        _value_columns[i], values.size());
     }
 
     vectorized::ColumnsWithTypeAndName key_columns_with_schema;
-    for (int i = 0; i < key_columns_.size(); ++i) {
+    for (int i = 0; i < _key_columns.size(); ++i) {
         key_columns_with_schema.emplace_back(
-                std::move(key_columns_[i]),
+                std::move(_key_columns[i]),
                 
Base::_shared_state->_in_mem_shared_state->probe_expr_ctxs[i]->root()->data_type(),
                 
Base::_shared_state->_in_mem_shared_state->probe_expr_ctxs[i]->root()->expr_name());
     }
-    key_block_ = key_columns_with_schema;
+    _key_block = key_columns_with_schema;
 
     vectorized::ColumnsWithTypeAndName value_columns_with_schema;
-    for (int i = 0; i < value_columns_.size(); ++i) {
+    for (int i = 0; i < _value_columns.size(); ++i) {
         value_columns_with_schema.emplace_back(
-                std::move(value_columns_[i]), value_data_types_[i],
+                std::move(_value_columns[i]), _value_data_types[i],
                 
Base::_shared_state->_in_mem_shared_state->aggregate_evaluators[i]
                         ->function()
                         ->get_name());
     }
-    value_block_ = value_columns_with_schema;
+    _value_block = value_columns_with_schema;
 
-    for (const auto& column : key_block_.get_columns_with_type_and_name()) {
-        block_.insert(column);
+    for (const auto& column : _key_block.get_columns_with_type_and_name()) {
+        _block.insert(column);
     }
-    for (const auto& column : value_block_.get_columns_with_type_and_name()) {
-        block_.insert(column);
+    for (const auto& column : _value_block.get_columns_with_type_and_name()) {
+        _block.insert(column);
     }
     return Status::OK();
 }
@@ -303,7 +304,7 @@ Status PartitionedAggSinkLocalState::_spill_partition(
         RuntimeState* state, HashTableCtxType& context, size_t partition_idx,
         std::vector<KeyType>& keys, std::vector<vectorized::AggregateDataPtr>& 
values,
         const vectorized::AggregateDataPtr null_key_data, bool is_last) {
-    auto status = to_block(context, keys, values, null_key_data);
+    auto status = _to_block(context, keys, values, null_key_data);
     RETURN_IF_ERROR(status);
 
     if (is_last) {
@@ -335,7 +336,7 @@ Status PartitionedAggSinkLocalState::_spill_partition(
         RETURN_IF_ERROR(spill_file->create_writer(state, 
Base::operator_profile(), writer));
     }
 
-    RETURN_IF_ERROR(writer->write_block(state, block_));
+    RETURN_IF_ERROR(writer->write_block(state, _block));
     _reset_tmp_data();
     return Status::OK();
 }
@@ -411,7 +412,7 @@ Status 
PartitionedAggSinkLocalState::_spill_hash_table(RuntimeState* state,
     return Status::OK();
 }
 
-Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
+Status PartitionedAggSinkLocalState::_revoke_memory(RuntimeState* state) {
     if (_eos) {
         return Status::OK();
     }
@@ -425,9 +426,9 @@ Status 
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
     if (!_shared_state->_is_spilled) {
         _shared_state->_is_spilled = true;
         custom_profile()->add_info_string("Spilled", "true");
-        update_profile<false>(sink_local_state->custom_profile());
+        _update_profile<false>(sink_local_state->custom_profile());
     } else {
-        update_profile<false>(sink_local_state->custom_profile());
+        _update_profile<false>(sink_local_state->custom_profile());
     }
 
     
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_submit_func",
 {
@@ -493,39 +494,39 @@ Status 
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
 }
 
 void PartitionedAggSinkLocalState::_reset_tmp_data() {
-    block_.clear();
-    key_columns_.clear();
-    value_columns_.clear();
-    key_block_.clear_column_data();
-    value_block_.clear_column_data();
-    key_columns_ = key_block_.mutate_columns();
-    value_columns_ = value_block_.mutate_columns();
+    _block.clear();
+    _key_columns.clear();
+    _value_columns.clear();
+    _key_block.clear_column_data();
+    _value_block.clear_column_data();
+    _key_columns = _key_block.mutate_columns();
+    _value_columns = _value_block.mutate_columns();
 }
 
 void PartitionedAggSinkLocalState::_clear_tmp_data() {
     {
         vectorized::Block empty_block;
-        block_.swap(empty_block);
+        _block.swap(empty_block);
     }
     {
         vectorized::Block empty_block;
-        key_block_.swap(empty_block);
+        _key_block.swap(empty_block);
     }
     {
         vectorized::Block empty_block;
-        value_block_.swap(empty_block);
+        _value_block.swap(empty_block);
     }
     {
         vectorized::MutableColumns cols;
-        key_columns_.swap(cols);
+        _key_columns.swap(cols);
     }
     {
         vectorized::MutableColumns cols;
-        value_columns_.swap(cols);
+        _value_columns.swap(cols);
     }
 
     vectorized::DataTypes tmp_value_data_types;
-    value_data_types_.swap(tmp_value_data_types);
+    _value_data_types.swap(tmp_value_data_types);
 }
 
 bool PartitionedAggSinkLocalState::is_blockable() const {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 5538c296562..6b2da40f5ff 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -42,20 +42,21 @@ public:
     PartitionedAggSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* 
state);
     ~PartitionedAggSinkLocalState() override = default;
 
-    friend class PartitionedAggSinkOperatorX;
-
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
     Status open(RuntimeState* state) override;
     Status close(RuntimeState* state, Status exec_status) override;
 
-    Status revoke_memory(RuntimeState* state);
+    bool is_blockable() const override;
 
-    Status setup_in_memory_agg_op(RuntimeState* state);
+private:
+    friend class PartitionedAggSinkOperatorX;
 
-    template <bool spilled>
-    void update_profile(RuntimeProfile* child_profile);
+    Status _revoke_memory(RuntimeState* state);
 
-    bool is_blockable() const override;
+    Status _setup_in_memory_agg_op(RuntimeState* state);
+
+    template <bool spilled>
+    void _update_profile(RuntimeProfile* child_profile);
 
     template <typename KeyType>
     struct TmpSpillInfo {
@@ -74,9 +75,9 @@ public:
                             const vectorized::AggregateDataPtr null_key_data, 
bool is_last);
 
     template <typename HashTableCtxType, typename KeyType>
-    Status to_block(HashTableCtxType& context, std::vector<KeyType>& keys,
-                    std::vector<vectorized::AggregateDataPtr>& values,
-                    const vectorized::AggregateDataPtr null_key_data);
+    Status _to_block(HashTableCtxType& context, std::vector<KeyType>& keys,
+                     std::vector<vectorized::AggregateDataPtr>& values,
+                     const vectorized::AggregateDataPtr null_key_data);
 
     void _reset_tmp_data();
     void _clear_tmp_data();
@@ -85,12 +86,12 @@ public:
     std::unique_ptr<RuntimeState> _runtime_state;
 
     // temp structures during spilling
-    vectorized::MutableColumns key_columns_;
-    vectorized::MutableColumns value_columns_;
-    vectorized::DataTypes value_data_types_;
-    vectorized::Block block_;
-    vectorized::Block key_block_;
-    vectorized::Block value_block_;
+    vectorized::MutableColumns _key_columns;
+    vectorized::MutableColumns _value_columns;
+    vectorized::DataTypes _value_data_types;
+    vectorized::Block _block;
+    vectorized::Block _key_block;
+    vectorized::Block _value_block;
 
     std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
     RuntimeProfile::Counter* _memory_usage_reserved = nullptr;
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index fe2ebb2df78..9f87efa490e 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -67,7 +67,7 @@ Status PartitionedAggLocalState::open(RuntimeState* state) {
         return Status::OK();
     }
     _opened = true;
-    RETURN_IF_ERROR(setup_in_memory_agg_op(state));
+    RETURN_IF_ERROR(_setup_in_memory_agg_op(state));
 
     return Status::OK();
 }
@@ -76,7 +76,7 @@ Status PartitionedAggLocalState::open(RuntimeState* state) {
     update_profile_from_inner_profile<spilled>(name, custom_profile(), 
child_profile)
 
 template <bool spilled>
-void PartitionedAggLocalState::update_profile(RuntimeProfile* child_profile) {
+void PartitionedAggLocalState::_update_profile(RuntimeProfile* child_profile) {
     UPDATE_COUNTER_FROM_INNER("GetResultsTime");
     UPDATE_COUNTER_FROM_INNER("HashTableIterateTime");
     UPDATE_COUNTER_FROM_INNER("InsertKeysToColumnTime");
@@ -219,7 +219,7 @@ Status 
PartitionedAggSourceOperatorX::revoke_memory(RuntimeState* state) {
                               
PrettyPrinter::print_bytes(local_state._estimate_memory_usage));
 
     // Flush hash table + repartition remaining spill files of the current 
partition.
-    RETURN_IF_ERROR(local_state.flush_and_repartition(state));
+    RETURN_IF_ERROR(local_state._flush_and_repartition(state));
     local_state._current_partition = AggSpillPartitionInfo {};
     local_state._need_to_setup_partition = true;
     return Status::OK();
@@ -241,7 +241,7 @@ Status 
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
         if (*eos) {
             auto* source_local_state =
                     
runtime_state->get_local_state(_agg_source_operator->operator_id());
-            
local_state.update_profile<false>(source_local_state->custom_profile());
+            
local_state._update_profile<false>(source_local_state->custom_profile());
         }
         local_state.reached_limit(block, eos);
         return Status::OK();
@@ -314,7 +314,7 @@ Status 
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
     if (inner_eos) {
         auto* source_local_state =
                 
runtime_state->get_local_state(_agg_source_operator->operator_id());
-        local_state.update_profile<true>(source_local_state->custom_profile());
+        
local_state._update_profile<true>(source_local_state->custom_profile());
 
         // Current partition fully output. Reset hash table, pop next 
partition.
         RETURN_IF_ERROR(_agg_source_operator->reset_hash_table(runtime_state));
@@ -387,7 +387,7 @@ Status 
PartitionedAggLocalState::_recover_blocks_from_partition(RuntimeState* st
     return Status::OK();
 }
 
-Status PartitionedAggLocalState::setup_in_memory_agg_op(RuntimeState* state) {
+Status PartitionedAggLocalState::_setup_in_memory_agg_op(RuntimeState* state) {
     _runtime_state = RuntimeState::create_unique(
             state->fragment_instance_id(), state->query_id(), 
state->fragment_id(),
             state->query_options(), TQueryGlobals {}, state->exec_env(), 
state->get_query_ctx());
@@ -416,7 +416,7 @@ Status 
PartitionedAggLocalState::setup_in_memory_agg_op(RuntimeState* state) {
     return source_local_state->open(state);
 }
 
-Status 
PartitionedAggLocalState::flush_hash_table_to_sub_spill_files(RuntimeState* 
state) {
+Status 
PartitionedAggLocalState::_flush_hash_table_to_sub_spill_files(RuntimeState* 
state) {
     auto* runtime_state = _runtime_state.get();
     auto& p = _parent->cast<PartitionedAggSourceOperatorX>();
     auto* in_mem_state = _shared_state->_in_mem_shared_state;
@@ -439,7 +439,7 @@ Status 
PartitionedAggLocalState::flush_hash_table_to_sub_spill_files(RuntimeStat
     return Status::OK();
 }
 
-Status PartitionedAggLocalState::flush_and_repartition(RuntimeState* state) {
+Status PartitionedAggLocalState::_flush_and_repartition(RuntimeState* state) {
     auto& p = _parent->cast<PartitionedAggSourceOperatorX>();
     const int new_level = _current_partition.level + 1;
 
@@ -459,7 +459,7 @@ Status 
PartitionedAggLocalState::flush_and_repartition(RuntimeState* state) {
     {
         auto* source_local_state =
                 
_runtime_state->get_local_state(p._agg_source_operator->operator_id());
-        update_profile<true>(source_local_state->custom_profile());
+        _update_profile<true>(source_local_state->custom_profile());
     }
 
     // 1. Create FANOUT output sub-spill-files.
@@ -485,7 +485,7 @@ Status 
PartitionedAggLocalState::flush_and_repartition(RuntimeState* state) {
     RETURN_IF_ERROR(_repartitioner.setup_output(state, output_spill_files));
 
     // 2. Flush the in-memory hash table into the sub-spill-files.
-    RETURN_IF_ERROR(flush_hash_table_to_sub_spill_files(state));
+    RETURN_IF_ERROR(_flush_hash_table_to_sub_spill_files(state));
 
     // 3. Route any in-memory blocks that were recovered but not yet merged
     //    into the hash table. These blocks were already read from the file
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index 518e59b5ac5..54f9aad3f8d 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -62,25 +62,25 @@ public:
     Status open(RuntimeState* state) override;
     Status close(RuntimeState* state) override;
 
-    Status setup_in_memory_agg_op(RuntimeState* state);
+    bool is_blockable() const override;
 
-    template <bool spilled>
-    void update_profile(RuntimeProfile* child_profile);
+private:
+    friend class PartitionedAggSourceOperatorX;
 
-    bool is_blockable() const override;
+    Status _setup_in_memory_agg_op(RuntimeState* state);
+
+    template <bool spilled>
+    void _update_profile(RuntimeProfile* child_profile);
 
     /// Flush the current in-memory hash table by draining it as blocks and 
routing
     /// each block through the repartitioner into the output sub-spill-files.
-    Status flush_hash_table_to_sub_spill_files(RuntimeState* state);
+    Status _flush_hash_table_to_sub_spill_files(RuntimeState* state);
 
     /// Flush the in-memory hash table into FANOUT sub-spill-files, 
repartition remaining
     /// unread spill files from `remaining_spill_files`, and push resulting 
sub-partitions into
     /// `_partition_queue`. After this call the hash table is reset and
     /// `remaining_spill_files` is cleared.
-    Status flush_and_repartition(RuntimeState* state);
-
-private:
-    friend class PartitionedAggSourceOperatorX;
+    Status _flush_and_repartition(RuntimeState* state);
 
     /// Move all original spill_partitions from shared state into 
`_partition_queue`.
     /// Called once when spilled get_block is first entered.
@@ -106,7 +106,7 @@ private:
     std::vector<vectorized::Block> _blocks;
 
     // Estimated in-memory hash table size for the current partition.
-    //size_t _estimate_memory_usage = 0;
+    size_t _estimate_memory_usage = 0;
 
     // Counters to track spill partition metrics
     RuntimeProfile::Counter* _max_partition_level = nullptr;
diff --git a/be/test/vec/spill/spill_file_test.cpp 
b/be/test/vec/spill/spill_file_test.cpp
new file mode 100644
index 00000000000..2313ed53614
--- /dev/null
+++ b/be/test/vec/spill/spill_file_test.cpp
@@ -0,0 +1,1040 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/spill/spill_file.h"
+
+#include <gtest/gtest.h>
+
+#include <algorithm>
+#include <memory>
+#include <numeric>
+#include <vector>
+
+#include "common/config.h"
+#include "io/fs/local_file_system.h"
+#include "runtime/exec_env.h"
+#include "testutil/column_helper.h"
+#include "testutil/mock/mock_runtime_state.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
+#include "vec/data_types/data_type_number.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/spill/spill_file_manager.h"
+#include "vec/spill/spill_file_reader.h"
+#include "vec/spill/spill_file_writer.h"
+
+namespace doris::vectorized {
+
+class SpillFileTest : public testing::Test {
+protected:
+    void SetUp() override {
+        _runtime_state = std::make_unique<MockRuntimeState>();
+
+        _profile = std::make_unique<RuntimeProfile>("test");
+        _custom_profile = std::make_unique<RuntimeProfile>("CustomCounters");
+        _common_profile = std::make_unique<RuntimeProfile>("CommonCounters");
+
+        _common_profile->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES, 
"", 1);
+        ADD_TIMER_WITH_LEVEL(_common_profile.get(), "ExecTime", 1);
+
+        ADD_TIMER_WITH_LEVEL(_custom_profile.get(), "SpillTotalTime", 1);
+        ADD_TIMER_WITH_LEVEL(_custom_profile.get(), "SpillWriteTime", 1);
+        ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), 
"SpillWriteTaskWaitInQueueCount", TUnit::UNIT,
+                               1);
+        ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteTaskCount", 
TUnit::UNIT, 1);
+        ADD_TIMER_WITH_LEVEL(_custom_profile.get(), 
"SpillWriteTaskWaitInQueueTime", 1);
+        ADD_TIMER_WITH_LEVEL(_custom_profile.get(), "SpillWriteFileTime", 1);
+        ADD_TIMER_WITH_LEVEL(_custom_profile.get(), 
"SpillWriteSerializeBlockTime", 1);
+        ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteBlockCount", 
TUnit::UNIT, 1);
+        ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteBlockBytes", 
TUnit::BYTES, 1);
+        ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteFileBytes", 
TUnit::BYTES, 1);
+        ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteRows", 
TUnit::UNIT, 1);
+        ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadFileTime", 
TUnit::UNIT, 1);
+        ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), 
"SpillReadDerializeBlockTime", TUnit::UNIT,
+                               1);
+        ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadBlockCount", 
TUnit::UNIT, 1);
+        ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadBlockBytes", 
TUnit::UNIT, 1);
+        ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadFileBytes", 
TUnit::UNIT, 1);
+        ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadRows", 
TUnit::UNIT, 1);
+        ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadFileCount", 
TUnit::UNIT, 1);
+        ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), 
"SpillWriteFileTotalCount", TUnit::UNIT, 1);
+        ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), 
"SpillWriteFileCurrentCount", TUnit::UNIT, 1);
+        ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), 
"SpillWriteFileCurrentBytes", TUnit::UNIT, 1);
+
+        _profile->add_child(_custom_profile.get(), true);
+        _profile->add_child(_common_profile.get(), true);
+
+        _spill_dir = "./ut_dir/spill_file_test";
+        auto spill_data_dir = std::make_unique<SpillDataDir>(_spill_dir, 1024L 
* 1024 * 128);
+        auto st = 
io::global_local_filesystem()->create_directory(spill_data_dir->path(), false);
+        ASSERT_TRUE(st.ok()) << "create directory failed: " << st.to_string();
+
+        std::unordered_map<std::string, std::unique_ptr<SpillDataDir>> 
data_map;
+        _data_dir_ptr = spill_data_dir.get();
+        data_map.emplace("test", std::move(spill_data_dir));
+        auto* spill_file_manager = new SpillFileManager(std::move(data_map));
+        ExecEnv::GetInstance()->_spill_file_mgr = spill_file_manager;
+        st = spill_file_manager->init();
+        ASSERT_TRUE(st.ok()) << "init spill file manager failed: " << 
st.to_string();
+    }
+
+    void TearDown() override {
+        ExecEnv::GetInstance()->spill_file_mgr()->stop();
+        SAFE_DELETE(ExecEnv::GetInstance()->_spill_file_mgr);
+        // Clean up test directory
+        auto st = io::global_local_filesystem()->delete_directory(_spill_dir);
+        (void)st;
+        _runtime_state.reset();
+    }
+
+    Block _create_int_block(const std::vector<int32_t>& data) {
+        return ColumnHelper::create_block<DataTypeInt32>(data);
+    }
+
+    Block _create_two_column_block(const std::vector<int32_t>& col1,
+                                   const std::vector<int64_t>& col2) {
+        auto block = ColumnHelper::create_block<DataTypeInt32>(col1);
+        
block.insert(ColumnHelper::create_column_with_name<DataTypeInt64>(col2));
+        return block;
+    }
+
+    std::unique_ptr<MockRuntimeState> _runtime_state;
+    std::unique_ptr<RuntimeProfile> _profile;
+    std::unique_ptr<RuntimeProfile> _custom_profile;
+    std::unique_ptr<RuntimeProfile> _common_profile;
+    std::string _spill_dir;
+    SpillDataDir* _data_dir_ptr = nullptr;
+};
+
+// ═══════════════════════════════════════════════════════════════════════
+// SpillFile basic tests
+// ═══════════════════════════════════════════════════════════════════════
+
+TEST_F(SpillFileTest, CreateSpillFile) {
+    SpillFileSPtr spill_file;
+    auto st = 
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/test_file",
+                                                                          
spill_file);
+    ASSERT_TRUE(st.ok()) << st.to_string();
+    ASSERT_TRUE(spill_file != nullptr);
+    ASSERT_FALSE(spill_file->ready_for_reading());
+}
+
+TEST_F(SpillFileTest, CreateWriterAndReader) {
+    SpillFileSPtr spill_file;
+    auto st = 
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/create_wr",
+                                                                          
spill_file);
+    ASSERT_TRUE(st.ok()) << st.to_string();
+
+    // Create writer
+    SpillFileWriterSPtr writer;
+    st = spill_file->create_writer(_runtime_state.get(), _profile.get(), 
writer);
+    ASSERT_TRUE(st.ok()) << st.to_string();
+    ASSERT_TRUE(writer != nullptr);
+
+    // Close writer with no data written
+    st = writer->close();
+    ASSERT_TRUE(st.ok()) << st.to_string();
+    ASSERT_TRUE(spill_file->ready_for_reading());
+
+    // Create reader on empty file (0 parts)
+    auto reader = spill_file->create_reader(_runtime_state.get(), 
_profile.get());
+    ASSERT_TRUE(reader != nullptr);
+
+    st = reader->open();
+    ASSERT_TRUE(st.ok()) << st.to_string();
+
+    Block block;
+    bool eos = false;
+    st = reader->read(&block, &eos);
+    ASSERT_TRUE(st.ok()) << st.to_string();
+    ASSERT_TRUE(eos);
+
+    st = reader->close();
+    ASSERT_TRUE(st.ok()) << st.to_string();
+}
+
+// ═══════════════════════════════════════════════════════════════════════
+// SpillFileWriter tests
+// ═══════════════════════════════════════════════════════════════════════
+
+TEST_F(SpillFileTest, WriteSingleBlock) {
+    SpillFileSPtr spill_file;
+    auto st = 
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/single_block",
+                                                                          
spill_file);
+    ASSERT_TRUE(st.ok());
+
+    SpillFileWriterSPtr writer;
+    st = spill_file->create_writer(_runtime_state.get(), _profile.get(), 
writer);
+    ASSERT_TRUE(st.ok());
+
+    auto block = _create_int_block({1, 2, 3, 4, 5});
+    st = writer->write_block(_runtime_state.get(), block);
+    ASSERT_TRUE(st.ok()) << st.to_string();
+
+    st = writer->close();
+    ASSERT_TRUE(st.ok()) << st.to_string();
+
+    ASSERT_TRUE(spill_file->ready_for_reading());
+
+    auto* write_rows_counter = _custom_profile->get_counter("SpillWriteRows");
+    ASSERT_TRUE(write_rows_counter != nullptr);
+    ASSERT_EQ(write_rows_counter->value(), 5);
+}
+
+TEST_F(SpillFileTest, WriteMultipleBlocks) {
+    SpillFileSPtr spill_file;
+    auto st = 
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/multi_blocks",
+                                                                          
spill_file);
+    ASSERT_TRUE(st.ok());
+
+    SpillFileWriterSPtr writer;
+    st = spill_file->create_writer(_runtime_state.get(), _profile.get(), 
writer);
+    ASSERT_TRUE(st.ok());
+
+    for (int i = 0; i < 5; ++i) {
+        auto block = _create_int_block({i * 10, i * 10 + 1, i * 10 + 2});
+        st = writer->write_block(_runtime_state.get(), block);
+        ASSERT_TRUE(st.ok()) << "write block " << i << " failed: " << 
st.to_string();
+    }
+
+    st = writer->close();
+    ASSERT_TRUE(st.ok()) << st.to_string();
+
+    auto* write_rows_counter = _custom_profile->get_counter("SpillWriteRows");
+    ASSERT_EQ(write_rows_counter->value(), 15);
+
+    auto* write_block_counter = 
_custom_profile->get_counter("SpillWriteBlockCount");
+    ASSERT_EQ(write_block_counter->value(), 5);
+}
+
+TEST_F(SpillFileTest, WriteTwoColumnBlock) {
+    SpillFileSPtr spill_file;
+    auto st = 
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/two_col",
+                                                                          
spill_file);
+    ASSERT_TRUE(st.ok());
+
+    SpillFileWriterSPtr writer;
+    st = spill_file->create_writer(_runtime_state.get(), _profile.get(), 
writer);
+    ASSERT_TRUE(st.ok());
+
+    auto block = _create_two_column_block({1, 2, 3}, {100, 200, 300});
+    st = writer->write_block(_runtime_state.get(), block);
+    ASSERT_TRUE(st.ok()) << st.to_string();
+
+    st = writer->close();
+    ASSERT_TRUE(st.ok()) << st.to_string();
+    ASSERT_TRUE(spill_file->ready_for_reading());
+}
+
+TEST_F(SpillFileTest, WriteEmptyBlock) {
+    SpillFileSPtr spill_file;
+    auto st = 
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/empty_block",
+                                                                          
spill_file);
+    ASSERT_TRUE(st.ok());
+
+    SpillFileWriterSPtr writer;
+    st = spill_file->create_writer(_runtime_state.get(), _profile.get(), 
writer);
+    ASSERT_TRUE(st.ok());
+
+    Block empty_block;
+    st = writer->write_block(_runtime_state.get(), empty_block);
+    ASSERT_TRUE(st.ok()) << st.to_string();
+
+    st = writer->close();
+    ASSERT_TRUE(st.ok()) << st.to_string();
+}
+
+TEST_F(SpillFileTest, DoubleCloseWriter) {
+    SpillFileSPtr spill_file;
+    auto st = 
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/double_close",
+                                                                          
spill_file);
+    ASSERT_TRUE(st.ok());
+
+    SpillFileWriterSPtr writer;
+    st = spill_file->create_writer(_runtime_state.get(), _profile.get(), 
writer);
+    ASSERT_TRUE(st.ok());
+
+    auto block = _create_int_block({1, 2, 3});
+    st = writer->write_block(_runtime_state.get(), block);
+    ASSERT_TRUE(st.ok());
+
+    st = writer->close();
+    ASSERT_TRUE(st.ok());
+
+    // Double close should be a no-op
+    st = writer->close();
+    ASSERT_TRUE(st.ok());
+}
+
+// ═══════════════════════════════════════════════════════════════════════
+// SpillFileReader tests
+// ═══════════════════════════════════════════════════════════════════════
+
+TEST_F(SpillFileTest, ReadSingleBlock) {
+    SpillFileSPtr spill_file;
+    auto st = 
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/read_single",
+                                                                          
spill_file);
+    ASSERT_TRUE(st.ok());
+
+    // Write
+    {
+        SpillFileWriterSPtr writer;
+        st = spill_file->create_writer(_runtime_state.get(), _profile.get(), 
writer);
+        ASSERT_TRUE(st.ok());
+
+        auto block = _create_int_block({10, 20, 30, 40, 50});
+        st = writer->write_block(_runtime_state.get(), block);
+        ASSERT_TRUE(st.ok());
+
+        st = writer->close();
+        ASSERT_TRUE(st.ok());
+    }
+
+    // Read
+    auto reader = spill_file->create_reader(_runtime_state.get(), 
_profile.get());
+    st = reader->open();
+    ASSERT_TRUE(st.ok()) << st.to_string();
+
+    Block block;
+    bool eos = false;
+    st = reader->read(&block, &eos);
+    ASSERT_TRUE(st.ok()) << st.to_string();
+    ASSERT_FALSE(eos);
+    ASSERT_EQ(block.rows(), 5);
+
+    // Verify data
+    auto col = block.get_by_position(0).column;
+    ASSERT_EQ(col->get_int(0), 10);
+    ASSERT_EQ(col->get_int(1), 20);
+    ASSERT_EQ(col->get_int(2), 30);
+    ASSERT_EQ(col->get_int(3), 40);
+    ASSERT_EQ(col->get_int(4), 50);
+
+    // Next read should be EOS
+    Block block2;
+    st = reader->read(&block2, &eos);
+    ASSERT_TRUE(st.ok());
+    ASSERT_TRUE(eos);
+
+    st = reader->close();
+    ASSERT_TRUE(st.ok());
+}
+
+TEST_F(SpillFileTest, ReadMultipleBlocks) {
+    SpillFileSPtr spill_file;
+    auto st = 
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/read_multi",
+                                                                          
spill_file);
+    ASSERT_TRUE(st.ok());
+
+    const int num_blocks = 10;
+    const int rows_per_block = 100;
+
+    // Write
+    {
+        SpillFileWriterSPtr writer;
+        st = spill_file->create_writer(_runtime_state.get(), _profile.get(), 
writer);
+        ASSERT_TRUE(st.ok());
+
+        for (int b = 0; b < num_blocks; ++b) {
+            std::vector<int32_t> data(rows_per_block);
+            std::iota(data.begin(), data.end(), b * rows_per_block);
+            auto block = _create_int_block(data);
+            st = writer->write_block(_runtime_state.get(), block);
+            ASSERT_TRUE(st.ok());
+        }
+
+        st = writer->close();
+        ASSERT_TRUE(st.ok());
+    }
+
+    // Read all blocks
+    auto reader = spill_file->create_reader(_runtime_state.get(), 
_profile.get());
+    st = reader->open();
+    ASSERT_TRUE(st.ok());
+
+    size_t total_rows = 0;
+    int block_count = 0;
+    bool eos = false;
+    while (!eos) {
+        Block block;
+        st = reader->read(&block, &eos);
+        ASSERT_TRUE(st.ok()) << st.to_string();
+        if (!eos) {
+            total_rows += block.rows();
+            ++block_count;
+        }
+    }
+
+    ASSERT_EQ(total_rows, num_blocks * rows_per_block);
+    ASSERT_EQ(block_count, num_blocks);
+
+    st = reader->close();
+    ASSERT_TRUE(st.ok());
+}
+
+TEST_F(SpillFileTest, ReadTwoColumnBlock) {
+    SpillFileSPtr spill_file;
+    auto st = 
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/read_two_col",
+                                                                          
spill_file);
+    ASSERT_TRUE(st.ok());
+
+    // Write
+    {
+        SpillFileWriterSPtr writer;
+        st = spill_file->create_writer(_runtime_state.get(), _profile.get(), 
writer);
+        ASSERT_TRUE(st.ok());
+
+        auto block = _create_two_column_block({1, 2, 3, 4}, {100, 200, 300, 
400});
+        st = writer->write_block(_runtime_state.get(), block);
+        ASSERT_TRUE(st.ok());
+
+        st = writer->close();
+        ASSERT_TRUE(st.ok());
+    }
+
+    // Read
+    auto reader = spill_file->create_reader(_runtime_state.get(), 
_profile.get());
+    st = reader->open();
+    ASSERT_TRUE(st.ok());
+
+    Block block;
+    bool eos = false;
+    st = reader->read(&block, &eos);
+    ASSERT_TRUE(st.ok());
+    ASSERT_FALSE(eos);
+    ASSERT_EQ(block.rows(), 4);
+    ASSERT_EQ(block.columns(), 2);
+
+    // Verify col1
+    auto col1 = block.get_by_position(0).column;
+    ASSERT_EQ(col1->get_int(0), 1);
+    ASSERT_EQ(col1->get_int(3), 4);
+
+    // Verify col2
+    auto col2 = block.get_by_position(1).column;
+    ASSERT_EQ(col2->get_int(0), 100);
+    ASSERT_EQ(col2->get_int(3), 400);
+
+    st = reader->close();
+    ASSERT_TRUE(st.ok());
+}
+
+// ═══════════════════════════════════════════════════════════════════════
+// Roundtrip tests (write -> read -> verify)
+// ═══════════════════════════════════════════════════════════════════════
+
+TEST_F(SpillFileTest, RoundtripSingleBlock) {
+    SpillFileSPtr spill_file;
+    auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(
+            "test_query/roundtrip_single", spill_file);
+    ASSERT_TRUE(st.ok());
+
+    std::vector<int32_t> original_data = {42, 7, 99, 1, 0, -5, 1000};
+
+    // Write
+    {
+        SpillFileWriterSPtr writer;
+        st = spill_file->create_writer(_runtime_state.get(), _profile.get(), 
writer);
+        ASSERT_TRUE(st.ok());
+
+        auto block = _create_int_block(original_data);
+        st = writer->write_block(_runtime_state.get(), block);
+        ASSERT_TRUE(st.ok());
+
+        st = writer->close();
+        ASSERT_TRUE(st.ok());
+    }
+
+    // Read & verify
+    auto reader = spill_file->create_reader(_runtime_state.get(), 
_profile.get());
+    st = reader->open();
+    ASSERT_TRUE(st.ok());
+
+    Block block;
+    bool eos = false;
+    st = reader->read(&block, &eos);
+    ASSERT_TRUE(st.ok());
+    ASSERT_EQ(block.rows(), original_data.size());
+
+    auto col = block.get_by_position(0).column;
+    for (size_t i = 0; i < original_data.size(); ++i) {
+        ASSERT_EQ(col->get_int(i), original_data[i]) << "mismatch at index " 
<< i;
+    }
+
+    st = reader->close();
+    ASSERT_TRUE(st.ok());
+}
+
+TEST_F(SpillFileTest, RoundtripMultipleBlocks) {
+    SpillFileSPtr spill_file;
+    auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(
+            "test_query/roundtrip_multi", spill_file);
+    ASSERT_TRUE(st.ok());
+
+    std::vector<std::vector<int32_t>> all_data = {
+            {1, 2, 3},
+            {10, 20, 30, 40},
+            {100, 200},
+            {-1, -2, -3, -4, -5},
+    };
+
+    // Write
+    {
+        SpillFileWriterSPtr writer;
+        st = spill_file->create_writer(_runtime_state.get(), _profile.get(), 
writer);
+        ASSERT_TRUE(st.ok());
+
+        for (const auto& data : all_data) {
+            auto block = _create_int_block(data);
+            st = writer->write_block(_runtime_state.get(), block);
+            ASSERT_TRUE(st.ok());
+        }
+
+        st = writer->close();
+        ASSERT_TRUE(st.ok());
+    }
+
+    // Read & verify
+    auto reader = spill_file->create_reader(_runtime_state.get(), 
_profile.get());
+    st = reader->open();
+    ASSERT_TRUE(st.ok());
+
+    size_t block_idx = 0;
+    bool eos = false;
+    while (!eos && block_idx < all_data.size()) {
+        Block block;
+        st = reader->read(&block, &eos);
+        ASSERT_TRUE(st.ok());
+        if (eos) break;
+
+        ASSERT_EQ(block.rows(), all_data[block_idx].size())
+                << "block " << block_idx << " row count mismatch";
+
+        auto col = block.get_by_position(0).column;
+        for (size_t i = 0; i < all_data[block_idx].size(); ++i) {
+            ASSERT_EQ(col->get_int(i), all_data[block_idx][i])
+                    << "mismatch at block " << block_idx << " row " << i;
+        }
+        ++block_idx;
+    }
+
+    ASSERT_EQ(block_idx, all_data.size());
+
+    st = reader->close();
+    ASSERT_TRUE(st.ok());
+}
+
+TEST_F(SpillFileTest, RoundtripLargeData) {
+    SpillFileSPtr spill_file;
+    auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(
+            "test_query/roundtrip_large", spill_file);
+    ASSERT_TRUE(st.ok());
+
+    const size_t row_count = 100000;
+    std::vector<int32_t> data(row_count);
+    std::iota(data.begin(), data.end(), 0);
+
+    // Write
+    {
+        SpillFileWriterSPtr writer;
+        st = spill_file->create_writer(_runtime_state.get(), _profile.get(), 
writer);
+        ASSERT_TRUE(st.ok());
+
+        auto block = _create_int_block(data);
+        st = writer->write_block(_runtime_state.get(), block);
+        ASSERT_TRUE(st.ok());
+
+        st = writer->close();
+        ASSERT_TRUE(st.ok());
+    }
+
+    // Read & verify
+    auto reader = spill_file->create_reader(_runtime_state.get(), 
_profile.get());
+    st = reader->open();
+    ASSERT_TRUE(st.ok());
+
+    Block block;
+    bool eos = false;
+    st = reader->read(&block, &eos);
+    ASSERT_TRUE(st.ok());
+    ASSERT_EQ(block.rows(), row_count);
+
+    auto col = block.get_by_position(0).column;
+    for (size_t i = 0; i < row_count; i += 1000) {
+        ASSERT_EQ(col->get_int(i), (int32_t)i) << "mismatch at index " << i;
+    }
+
+    st = reader->close();
+    ASSERT_TRUE(st.ok());
+}
+
+// ═══════════════════════════════════════════════════════════════════════
+// Part rotation tests
+// ═══════════════════════════════════════════════════════════════════════
+
+TEST_F(SpillFileTest, PartRotation) {
+    // Set a very small part size to force rotation
+    auto saved_part_size = config::spill_file_part_size_bytes;
+    config::spill_file_part_size_bytes = 1024; // 1KB per part
+
+    SpillFileSPtr spill_file;
+    auto st = 
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/rotation",
+                                                                          
spill_file);
+    ASSERT_TRUE(st.ok());
+
+    const int num_blocks = 20;
+
+    // Write many blocks to trigger multiple part rotations
+    {
+        SpillFileWriterSPtr writer;
+        st = spill_file->create_writer(_runtime_state.get(), _profile.get(), 
writer);
+        ASSERT_TRUE(st.ok());
+
+        for (int i = 0; i < num_blocks; ++i) {
+            std::vector<int32_t> data(100);
+            std::iota(data.begin(), data.end(), i * 100);
+            auto block = _create_int_block(data);
+            st = writer->write_block(_runtime_state.get(), block);
+            ASSERT_TRUE(st.ok());
+        }
+
+        st = writer->close();
+        ASSERT_TRUE(st.ok());
+    }
+
+    // Read back and verify all data across multiple parts
+    auto reader = spill_file->create_reader(_runtime_state.get(), 
_profile.get());
+    st = reader->open();
+    ASSERT_TRUE(st.ok());
+
+    size_t total_rows = 0;
+    int block_count = 0;
+    bool eos = false;
+    while (!eos) {
+        Block block;
+        st = reader->read(&block, &eos);
+        ASSERT_TRUE(st.ok());
+        if (!eos) {
+            total_rows += block.rows();
+            ++block_count;
+        }
+    }
+
+    ASSERT_EQ(total_rows, num_blocks * 100);
+    ASSERT_EQ(block_count, num_blocks);
+
+    st = reader->close();
+    ASSERT_TRUE(st.ok());
+
+    config::spill_file_part_size_bytes = saved_part_size;
+}
+
+TEST_F(SpillFileTest, PartRotationDataIntegrity) {
+    // Set a small part size to force rotation
+    auto saved_part_size = config::spill_file_part_size_bytes;
+    config::spill_file_part_size_bytes = 512;
+
+    SpillFileSPtr spill_file;
+    auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(
+            "test_query/rotation_integrity", spill_file);
+    ASSERT_TRUE(st.ok());
+
+    std::vector<std::vector<int32_t>> all_data;
+    const int num_blocks = 30;
+
+    // Write
+    {
+        SpillFileWriterSPtr writer;
+        st = spill_file->create_writer(_runtime_state.get(), _profile.get(), 
writer);
+        ASSERT_TRUE(st.ok());
+
+        for (int i = 0; i < num_blocks; ++i) {
+            std::vector<int32_t> data(50);
+            std::iota(data.begin(), data.end(), i * 1000);
+            all_data.push_back(data);
+            auto block = _create_int_block(data);
+            st = writer->write_block(_runtime_state.get(), block);
+            ASSERT_TRUE(st.ok());
+        }
+
+        st = writer->close();
+        ASSERT_TRUE(st.ok());
+    }
+
+    // Read & verify data integrity across parts
+    auto reader = spill_file->create_reader(_runtime_state.get(), 
_profile.get());
+    st = reader->open();
+    ASSERT_TRUE(st.ok());
+
+    size_t block_idx = 0;
+    bool eos = false;
+    while (!eos) {
+        Block block;
+        st = reader->read(&block, &eos);
+        ASSERT_TRUE(st.ok());
+        if (eos) break;
+
+        ASSERT_LT(block_idx, all_data.size());
+        ASSERT_EQ(block.rows(), all_data[block_idx].size());
+
+        auto col = block.get_by_position(0).column;
+        for (size_t i = 0; i < all_data[block_idx].size(); ++i) {
+            ASSERT_EQ(col->get_int(i), all_data[block_idx][i])
+                    << "data mismatch at block " << block_idx << " row " << i;
+        }
+        ++block_idx;
+    }
+
+    ASSERT_EQ(block_idx, all_data.size());
+
+    st = reader->close();
+    ASSERT_TRUE(st.ok());
+
+    config::spill_file_part_size_bytes = saved_part_size;
+}
+
+// ═══════════════════════════════════════════════════════════════════════
+// Seek tests
+// ═══════════════════════════════════════════════════════════════════════
+
+TEST_F(SpillFileTest, SeekToBlock) {
+    SpillFileSPtr spill_file;
+    auto st = 
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/seek",
+                                                                          
spill_file);
+    ASSERT_TRUE(st.ok());
+
+    const int num_blocks = 5;
+
+    // Write
+    {
+        SpillFileWriterSPtr writer;
+        st = spill_file->create_writer(_runtime_state.get(), _profile.get(), 
writer);
+        ASSERT_TRUE(st.ok());
+
+        for (int i = 0; i < num_blocks; ++i) {
+            auto block = _create_int_block({i * 10, i * 10 + 1, i * 10 + 2});
+            st = writer->write_block(_runtime_state.get(), block);
+            ASSERT_TRUE(st.ok());
+        }
+
+        st = writer->close();
+        ASSERT_TRUE(st.ok());
+    }
+
+    // Seek to block 2 (0-based) and read
+    auto reader = spill_file->create_reader(_runtime_state.get(), 
_profile.get());
+    st = reader->open();
+    ASSERT_TRUE(st.ok());
+
+    reader->seek(2);
+
+    Block block;
+    bool eos = false;
+    st = reader->read(&block, &eos);
+    ASSERT_TRUE(st.ok());
+    ASSERT_FALSE(eos);
+    ASSERT_EQ(block.rows(), 3);
+
+    auto col = block.get_by_position(0).column;
+    ASSERT_EQ(col->get_int(0), 20);
+    ASSERT_EQ(col->get_int(1), 21);
+    ASSERT_EQ(col->get_int(2), 22);
+
+    st = reader->close();
+    ASSERT_TRUE(st.ok());
+}
+
+TEST_F(SpillFileTest, SeekBeyondEnd) {
+    SpillFileSPtr spill_file;
+    auto st = 
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/seek_beyond",
+                                                                          
spill_file);
+    ASSERT_TRUE(st.ok());
+
+    // Write 3 blocks
+    {
+        SpillFileWriterSPtr writer;
+        st = spill_file->create_writer(_runtime_state.get(), _profile.get(), 
writer);
+        ASSERT_TRUE(st.ok());
+
+        for (int i = 0; i < 3; ++i) {
+            auto block = _create_int_block({i});
+            st = writer->write_block(_runtime_state.get(), block);
+            ASSERT_TRUE(st.ok());
+        }
+
+        st = writer->close();
+        ASSERT_TRUE(st.ok());
+    }
+
+    auto reader = spill_file->create_reader(_runtime_state.get(), 
_profile.get());
+    st = reader->open();
+    ASSERT_TRUE(st.ok());
+
+    // Seek beyond the end
+    reader->seek(100);
+
+    Block block;
+    bool eos = false;
+    st = reader->read(&block, &eos);
+    ASSERT_TRUE(st.ok());
+    ASSERT_TRUE(eos);
+
+    st = reader->close();
+    ASSERT_TRUE(st.ok());
+}
+
+// ═══════════════════════════════════════════════════════════════════════
+// SpillFile GC/lifecycle tests
+// ═══════════════════════════════════════════════════════════════════════
+
+TEST_F(SpillFileTest, GCCleansUpFiles) {
+    std::string spill_file_dir;
+
+    {
+        SpillFileSPtr spill_file;
+        auto st = 
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/gc_test",
+                                                                              
spill_file);
+        ASSERT_TRUE(st.ok());
+
+        SpillFileWriterSPtr writer;
+        st = spill_file->create_writer(_runtime_state.get(), _profile.get(), 
writer);
+        ASSERT_TRUE(st.ok());
+
+        auto block = _create_int_block({1, 2, 3});
+        st = writer->write_block(_runtime_state.get(), block);
+        ASSERT_TRUE(st.ok());
+
+        st = writer->close();
+        ASSERT_TRUE(st.ok());
+
+        // Remember the spill directory path
+        spill_file_dir = _data_dir_ptr->get_spill_data_path() + 
"/test_query/gc_test";
+
+        // Verify directory exists
+        bool exists = false;
+        st = io::global_local_filesystem()->exists(spill_file_dir, &exists);
+        ASSERT_TRUE(st.ok());
+        ASSERT_TRUE(exists);
+
+        // spill_file goes out of scope here, destructor calls gc()
+    }
+
+    // After SpillFile is destroyed, the directory should be cleaned up
+    bool exists = false;
+    auto st = io::global_local_filesystem()->exists(spill_file_dir, &exists);
+    ASSERT_TRUE(st.ok());
+    ASSERT_FALSE(exists);
+}
+
+TEST_F(SpillFileTest, DeleteSpillFileThroughManager) {
+    SpillFileSPtr spill_file;
+    auto st = 
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/mgr_delete",
+                                                                          
spill_file);
+    ASSERT_TRUE(st.ok());
+
+    SpillFileWriterSPtr writer;
+    st = spill_file->create_writer(_runtime_state.get(), _profile.get(), 
writer);
+    ASSERT_TRUE(st.ok());
+
+    auto block = _create_int_block({1, 2, 3});
+    st = writer->write_block(_runtime_state.get(), block);
+    ASSERT_TRUE(st.ok());
+
+    st = writer->close();
+    ASSERT_TRUE(st.ok());
+
+    // Delete through manager (async GC)
+    ExecEnv::GetInstance()->spill_file_mgr()->delete_spill_file(spill_file);
+
+    // Run GC to process the deletion
+    ExecEnv::GetInstance()->spill_file_mgr()->gc(1000);
+}
+
+// ═══════════════════════════════════════════════════════════════════════
+// SpillFileManager tests
+// ═══════════════════════════════════════════════════════════════════════
+
+TEST_F(SpillFileTest, ManagerNextId) {
+    auto id1 = ExecEnv::GetInstance()->spill_file_mgr()->next_id();
+    auto id2 = ExecEnv::GetInstance()->spill_file_mgr()->next_id();
+    auto id3 = ExecEnv::GetInstance()->spill_file_mgr()->next_id();
+
+    ASSERT_EQ(id2, id1 + 1);
+    ASSERT_EQ(id3, id2 + 1);
+}
+
+TEST_F(SpillFileTest, ManagerCreateMultipleFiles) {
+    const int num_files = 5;
+    std::vector<SpillFileSPtr> files;
+
+    for (int i = 0; i < num_files; ++i) {
+        SpillFileSPtr spill_file;
+        auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(
+                fmt::format("test_query/multi_{}", i), spill_file);
+        ASSERT_TRUE(st.ok()) << "create file " << i << " failed: " << 
st.to_string();
+        files.push_back(spill_file);
+    }
+
+    // Write and close each file
+    for (int i = 0; i < num_files; ++i) {
+        SpillFileWriterSPtr writer;
+        auto st = files[i]->create_writer(_runtime_state.get(), 
_profile.get(), writer);
+        ASSERT_TRUE(st.ok());
+
+        auto block = _create_int_block({i * 100});
+        st = writer->write_block(_runtime_state.get(), block);
+        ASSERT_TRUE(st.ok());
+
+        st = writer->close();
+        ASSERT_TRUE(st.ok());
+    }
+
+    // Read each file and verify
+    for (int i = 0; i < num_files; ++i) {
+        auto reader = files[i]->create_reader(_runtime_state.get(), 
_profile.get());
+        auto st = reader->open();
+        ASSERT_TRUE(st.ok());
+
+        Block block;
+        bool eos = false;
+        st = reader->read(&block, &eos);
+        ASSERT_TRUE(st.ok());
+        ASSERT_EQ(block.rows(), 1);
+
+        auto col = block.get_by_position(0).column;
+        ASSERT_EQ(col->get_int(0), i * 100);
+
+        st = reader->close();
+        ASSERT_TRUE(st.ok());
+    }
+}
+
+// ═══════════════════════════════════════════════════════════════════════
+// Profile counter tests
+// ═══════════════════════════════════════════════════════════════════════
+
+TEST_F(SpillFileTest, WriteCounters) {
+    SpillFileSPtr spill_file;
+    auto st = 
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/counters",
+                                                                          
spill_file);
+    ASSERT_TRUE(st.ok());
+
+    SpillFileWriterSPtr writer;
+    st = spill_file->create_writer(_runtime_state.get(), _profile.get(), 
writer);
+    ASSERT_TRUE(st.ok());
+
+    auto block = _create_int_block({1, 2, 3, 4, 5});
+    st = writer->write_block(_runtime_state.get(), block);
+    ASSERT_TRUE(st.ok());
+
+    auto block2 = _create_int_block({10, 20, 30});
+    st = writer->write_block(_runtime_state.get(), block2);
+    ASSERT_TRUE(st.ok());
+
+    st = writer->close();
+    ASSERT_TRUE(st.ok());
+
+    auto* write_rows = _custom_profile->get_counter("SpillWriteRows");
+    ASSERT_TRUE(write_rows != nullptr);
+    ASSERT_EQ(write_rows->value(), 8);
+
+    auto* write_blocks = _custom_profile->get_counter("SpillWriteBlockCount");
+    ASSERT_TRUE(write_blocks != nullptr);
+    ASSERT_EQ(write_blocks->value(), 2);
+
+    auto* write_bytes = _custom_profile->get_counter("SpillWriteFileBytes");
+    ASSERT_TRUE(write_bytes != nullptr);
+    ASSERT_GT(write_bytes->value(), 0);
+}
+
+TEST_F(SpillFileTest, ReadCounters) {
+    SpillFileSPtr spill_file;
+    auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(
+            "test_query/read_counters", spill_file);
+    ASSERT_TRUE(st.ok());
+
+    // Write
+    {
+        SpillFileWriterSPtr writer;
+        st = spill_file->create_writer(_runtime_state.get(), _profile.get(), 
writer);
+        ASSERT_TRUE(st.ok());
+
+        auto block = _create_int_block({1, 2, 3, 4, 5});
+        st = writer->write_block(_runtime_state.get(), block);
+        ASSERT_TRUE(st.ok());
+
+        st = writer->close();
+        ASSERT_TRUE(st.ok());
+    }
+
+    // Read
+    auto reader = spill_file->create_reader(_runtime_state.get(), 
_profile.get());
+    st = reader->open();
+    ASSERT_TRUE(st.ok());
+
+    Block block;
+    bool eos = false;
+    st = reader->read(&block, &eos);
+    ASSERT_TRUE(st.ok());
+
+    st = reader->close();
+    ASSERT_TRUE(st.ok());
+
+    auto* read_blocks = _custom_profile->get_counter("SpillReadBlockCount");
+    ASSERT_TRUE(read_blocks != nullptr);
+    ASSERT_EQ(read_blocks->value(), 1);
+
+    auto* read_rows = _custom_profile->get_counter("SpillReadRows");
+    ASSERT_TRUE(read_rows != nullptr);
+    ASSERT_EQ(read_rows->value(), 5);
+
+    auto* read_file_size = _custom_profile->get_counter("SpillReadFileBytes");
+    ASSERT_TRUE(read_file_size != nullptr);
+    ASSERT_GT(read_file_size->value(), 0);
+}
+
+// ═══════════════════════════════════════════════════════════════════════
+// SpillDataDir tests
+// ═══════════════════════════════════════════════════════════════════════
+
+TEST_F(SpillFileTest, DataDirCapacityTracking) {
+    SpillFileSPtr spill_file;
+    auto st = 
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/capacity",
+                                                                          
spill_file);
+    ASSERT_TRUE(st.ok());
+
+    auto initial_bytes = _data_dir_ptr->get_spill_data_bytes();
+
+    SpillFileWriterSPtr writer;
+    st = spill_file->create_writer(_runtime_state.get(), _profile.get(), 
writer);
+    ASSERT_TRUE(st.ok());
+
+    // Write a block to increase usage
+    std::vector<int32_t> data(1000);
+    std::iota(data.begin(), data.end(), 0);
+    auto block = _create_int_block(data);
+    st = writer->write_block(_runtime_state.get(), block);
+    ASSERT_TRUE(st.ok());
+
+    st = writer->close();
+    ASSERT_TRUE(st.ok());
+
+    auto after_write_bytes = _data_dir_ptr->get_spill_data_bytes();
+    ASSERT_GT(after_write_bytes, initial_bytes);
+}
+
+} // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to