This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_repartition in repository https://gitbox.apache.org/repos/asf/doris.git
commit 3d35204aae3bc7a35d0394b12f70254729fad659 Author: yiguolei <[email protected]> AuthorDate: Wed Mar 4 15:11:56 2026 +0800 refactor code and add unit test --- .../exec/partitioned_aggregation_sink_operator.cpp | 89 +- .../exec/partitioned_aggregation_sink_operator.h | 33 +- .../partitioned_aggregation_source_operator.cpp | 20 +- .../exec/partitioned_aggregation_source_operator.h | 20 +- be/test/vec/spill/spill_file_test.cpp | 1040 ++++++++++++++++++++ 5 files changed, 1122 insertions(+), 80 deletions(-) diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index fd785f78335..c2e36dba2b8 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -51,15 +51,15 @@ Status PartitionedAggSinkLocalState::init(doris::RuntimeState* state, auto& parent = Base::_parent->template cast<Parent>(); _spill_writers.resize(parent._partition_count); - RETURN_IF_ERROR(setup_in_memory_agg_op(state)); + RETURN_IF_ERROR(_setup_in_memory_agg_op(state)); for (const auto& probe_expr_ctx : Base::_shared_state->_in_mem_shared_state->probe_expr_ctxs) { - key_columns_.emplace_back(probe_expr_ctx->root()->data_type()->create_column()); + _key_columns.emplace_back(probe_expr_ctx->root()->data_type()->create_column()); } for (const auto& aggregate_evaluator : Base::_shared_state->_in_mem_shared_state->aggregate_evaluators) { - value_data_types_.emplace_back(aggregate_evaluator->function()->get_serialized_type()); - value_columns_.emplace_back(aggregate_evaluator->function()->create_serialize_column()); + _value_data_types.emplace_back(aggregate_evaluator->function()->get_serialized_type()); + _value_columns.emplace_back(aggregate_evaluator->function()->create_serialize_column()); } _rows_in_partitions.assign(parent._partition_count, 0); return Status::OK(); @@ -102,7 +102,7 @@ void PartitionedAggSinkLocalState::_init_counters() { update_profile_from_inner_profile<spilled>(name, custom_profile(), child_profile) template <bool spilled> -void PartitionedAggSinkLocalState::update_profile(RuntimeProfile* child_profile) { +void PartitionedAggSinkLocalState::_update_profile(RuntimeProfile* child_profile) { UPDATE_PROFILE("MemoryUsageHashTable"); UPDATE_PROFILE("MemoryUsageSerializeKeyArena"); UPDATE_PROFILE("BuildTime"); @@ -159,7 +159,7 @@ Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized: } } else { auto* sink_local_state = local_state._runtime_state->get_sink_local_state(); - local_state.update_profile<false>(sink_local_state->custom_profile()); + local_state._update_profile<false>(sink_local_state->custom_profile()); } // finally perform EOS bookkeeping @@ -190,7 +190,7 @@ Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized: Status PartitionedAggSinkOperatorX::revoke_memory(RuntimeState* state) { auto& local_state = get_local_state(state); - return local_state.revoke_memory(state); + return local_state._revoke_memory(state); } size_t PartitionedAggSinkOperatorX::revocable_mem_size(RuntimeState* state) const { @@ -204,7 +204,7 @@ size_t PartitionedAggSinkOperatorX::revocable_mem_size(RuntimeState* state) cons return size > state->spill_min_revocable_mem() ? size : 0; } -Status PartitionedAggSinkLocalState::setup_in_memory_agg_op(RuntimeState* state) { +Status PartitionedAggSinkLocalState::_setup_in_memory_agg_op(RuntimeState* state) { _runtime_state = RuntimeState::create_unique( state->fragment_instance_id(), state->query_id(), state->fragment_id(), state->query_options(), TQueryGlobals {}, state->exec_env(), state->get_query_ctx()); @@ -244,18 +244,19 @@ size_t PartitionedAggSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bo } template <typename HashTableCtxType, typename KeyType> -Status PartitionedAggSinkLocalState::to_block(HashTableCtxType& context, std::vector<KeyType>& keys, - std::vector<vectorized::AggregateDataPtr>& values, - const vectorized::AggregateDataPtr null_key_data) { +Status PartitionedAggSinkLocalState::_to_block(HashTableCtxType& context, + std::vector<KeyType>& keys, + std::vector<vectorized::AggregateDataPtr>& values, + const vectorized::AggregateDataPtr null_key_data) { SCOPED_TIMER(_spill_serialize_hash_table_timer); - context.insert_keys_into_columns(keys, key_columns_, (uint32_t)keys.size()); + context.insert_keys_into_columns(keys, _key_columns, (uint32_t)keys.size()); if (null_key_data) { // only one key of group by support wrap null key // here need additional processing logic on the null key / value - CHECK(key_columns_.size() == 1); - CHECK(key_columns_[0]->is_nullable()); - key_columns_[0]->insert_data(nullptr, 0); + CHECK(_key_columns.size() == 1); + CHECK(_key_columns[0]->is_nullable()); + _key_columns[0]->insert_data(nullptr, 0); values.emplace_back(null_key_data); } @@ -267,33 +268,33 @@ Status PartitionedAggSinkLocalState::to_block(HashTableCtxType& context, std::ve ->serialize_to_column( values, Base::_shared_state->_in_mem_shared_state->offsets_of_aggregate_states[i], - value_columns_[i], values.size()); + _value_columns[i], values.size()); } vectorized::ColumnsWithTypeAndName key_columns_with_schema; - for (int i = 0; i < key_columns_.size(); ++i) { + for (int i = 0; i < _key_columns.size(); ++i) { key_columns_with_schema.emplace_back( - std::move(key_columns_[i]), + std::move(_key_columns[i]), Base::_shared_state->_in_mem_shared_state->probe_expr_ctxs[i]->root()->data_type(), Base::_shared_state->_in_mem_shared_state->probe_expr_ctxs[i]->root()->expr_name()); } - key_block_ = key_columns_with_schema; + _key_block = key_columns_with_schema; vectorized::ColumnsWithTypeAndName value_columns_with_schema; - for (int i = 0; i < value_columns_.size(); ++i) { + for (int i = 0; i < _value_columns.size(); ++i) { value_columns_with_schema.emplace_back( - std::move(value_columns_[i]), value_data_types_[i], + std::move(_value_columns[i]), _value_data_types[i], Base::_shared_state->_in_mem_shared_state->aggregate_evaluators[i] ->function() ->get_name()); } - value_block_ = value_columns_with_schema; + _value_block = value_columns_with_schema; - for (const auto& column : key_block_.get_columns_with_type_and_name()) { - block_.insert(column); + for (const auto& column : _key_block.get_columns_with_type_and_name()) { + _block.insert(column); } - for (const auto& column : value_block_.get_columns_with_type_and_name()) { - block_.insert(column); + for (const auto& column : _value_block.get_columns_with_type_and_name()) { + _block.insert(column); } return Status::OK(); } @@ -303,7 +304,7 @@ Status PartitionedAggSinkLocalState::_spill_partition( RuntimeState* state, HashTableCtxType& context, size_t partition_idx, std::vector<KeyType>& keys, std::vector<vectorized::AggregateDataPtr>& values, const vectorized::AggregateDataPtr null_key_data, bool is_last) { - auto status = to_block(context, keys, values, null_key_data); + auto status = _to_block(context, keys, values, null_key_data); RETURN_IF_ERROR(status); if (is_last) { @@ -335,7 +336,7 @@ Status PartitionedAggSinkLocalState::_spill_partition( RETURN_IF_ERROR(spill_file->create_writer(state, Base::operator_profile(), writer)); } - RETURN_IF_ERROR(writer->write_block(state, block_)); + RETURN_IF_ERROR(writer->write_block(state, _block)); _reset_tmp_data(); return Status::OK(); } @@ -411,7 +412,7 @@ Status PartitionedAggSinkLocalState::_spill_hash_table(RuntimeState* state, return Status::OK(); } -Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { +Status PartitionedAggSinkLocalState::_revoke_memory(RuntimeState* state) { if (_eos) { return Status::OK(); } @@ -425,9 +426,9 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { if (!_shared_state->_is_spilled) { _shared_state->_is_spilled = true; custom_profile()->add_info_string("Spilled", "true"); - update_profile<false>(sink_local_state->custom_profile()); + _update_profile<false>(sink_local_state->custom_profile()); } else { - update_profile<false>(sink_local_state->custom_profile()); + _update_profile<false>(sink_local_state->custom_profile()); } DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_submit_func", { @@ -493,39 +494,39 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { } void PartitionedAggSinkLocalState::_reset_tmp_data() { - block_.clear(); - key_columns_.clear(); - value_columns_.clear(); - key_block_.clear_column_data(); - value_block_.clear_column_data(); - key_columns_ = key_block_.mutate_columns(); - value_columns_ = value_block_.mutate_columns(); + _block.clear(); + _key_columns.clear(); + _value_columns.clear(); + _key_block.clear_column_data(); + _value_block.clear_column_data(); + _key_columns = _key_block.mutate_columns(); + _value_columns = _value_block.mutate_columns(); } void PartitionedAggSinkLocalState::_clear_tmp_data() { { vectorized::Block empty_block; - block_.swap(empty_block); + _block.swap(empty_block); } { vectorized::Block empty_block; - key_block_.swap(empty_block); + _key_block.swap(empty_block); } { vectorized::Block empty_block; - value_block_.swap(empty_block); + _value_block.swap(empty_block); } { vectorized::MutableColumns cols; - key_columns_.swap(cols); + _key_columns.swap(cols); } { vectorized::MutableColumns cols; - value_columns_.swap(cols); + _value_columns.swap(cols); } vectorized::DataTypes tmp_value_data_types; - value_data_types_.swap(tmp_value_data_types); + _value_data_types.swap(tmp_value_data_types); } bool PartitionedAggSinkLocalState::is_blockable() const { diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 5538c296562..6b2da40f5ff 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -42,20 +42,21 @@ public: PartitionedAggSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state); ~PartitionedAggSinkLocalState() override = default; - friend class PartitionedAggSinkOperatorX; - Status init(RuntimeState* state, LocalSinkStateInfo& info) override; Status open(RuntimeState* state) override; Status close(RuntimeState* state, Status exec_status) override; - Status revoke_memory(RuntimeState* state); + bool is_blockable() const override; - Status setup_in_memory_agg_op(RuntimeState* state); +private: + friend class PartitionedAggSinkOperatorX; - template <bool spilled> - void update_profile(RuntimeProfile* child_profile); + Status _revoke_memory(RuntimeState* state); - bool is_blockable() const override; + Status _setup_in_memory_agg_op(RuntimeState* state); + + template <bool spilled> + void _update_profile(RuntimeProfile* child_profile); template <typename KeyType> struct TmpSpillInfo { @@ -74,9 +75,9 @@ public: const vectorized::AggregateDataPtr null_key_data, bool is_last); template <typename HashTableCtxType, typename KeyType> - Status to_block(HashTableCtxType& context, std::vector<KeyType>& keys, - std::vector<vectorized::AggregateDataPtr>& values, - const vectorized::AggregateDataPtr null_key_data); + Status _to_block(HashTableCtxType& context, std::vector<KeyType>& keys, + std::vector<vectorized::AggregateDataPtr>& values, + const vectorized::AggregateDataPtr null_key_data); void _reset_tmp_data(); void _clear_tmp_data(); @@ -85,12 +86,12 @@ public: std::unique_ptr<RuntimeState> _runtime_state; // temp structures during spilling - vectorized::MutableColumns key_columns_; - vectorized::MutableColumns value_columns_; - vectorized::DataTypes value_data_types_; - vectorized::Block block_; - vectorized::Block key_block_; - vectorized::Block value_block_; + vectorized::MutableColumns _key_columns; + vectorized::MutableColumns _value_columns; + vectorized::DataTypes _value_data_types; + vectorized::Block _block; + vectorized::Block _key_block; + vectorized::Block _value_block; std::unique_ptr<RuntimeProfile> _internal_runtime_profile; RuntimeProfile::Counter* _memory_usage_reserved = nullptr; diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index fe2ebb2df78..9f87efa490e 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -67,7 +67,7 @@ Status PartitionedAggLocalState::open(RuntimeState* state) { return Status::OK(); } _opened = true; - RETURN_IF_ERROR(setup_in_memory_agg_op(state)); + RETURN_IF_ERROR(_setup_in_memory_agg_op(state)); return Status::OK(); } @@ -76,7 +76,7 @@ Status PartitionedAggLocalState::open(RuntimeState* state) { update_profile_from_inner_profile<spilled>(name, custom_profile(), child_profile) template <bool spilled> -void PartitionedAggLocalState::update_profile(RuntimeProfile* child_profile) { +void PartitionedAggLocalState::_update_profile(RuntimeProfile* child_profile) { UPDATE_COUNTER_FROM_INNER("GetResultsTime"); UPDATE_COUNTER_FROM_INNER("HashTableIterateTime"); UPDATE_COUNTER_FROM_INNER("InsertKeysToColumnTime"); @@ -219,7 +219,7 @@ Status PartitionedAggSourceOperatorX::revoke_memory(RuntimeState* state) { PrettyPrinter::print_bytes(local_state._estimate_memory_usage)); // Flush hash table + repartition remaining spill files of the current partition. - RETURN_IF_ERROR(local_state.flush_and_repartition(state)); + RETURN_IF_ERROR(local_state._flush_and_repartition(state)); local_state._current_partition = AggSpillPartitionInfo {}; local_state._need_to_setup_partition = true; return Status::OK(); @@ -241,7 +241,7 @@ Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized: if (*eos) { auto* source_local_state = runtime_state->get_local_state(_agg_source_operator->operator_id()); - local_state.update_profile<false>(source_local_state->custom_profile()); + local_state._update_profile<false>(source_local_state->custom_profile()); } local_state.reached_limit(block, eos); return Status::OK(); @@ -314,7 +314,7 @@ Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized: if (inner_eos) { auto* source_local_state = runtime_state->get_local_state(_agg_source_operator->operator_id()); - local_state.update_profile<true>(source_local_state->custom_profile()); + local_state._update_profile<true>(source_local_state->custom_profile()); // Current partition fully output. Reset hash table, pop next partition. RETURN_IF_ERROR(_agg_source_operator->reset_hash_table(runtime_state)); @@ -387,7 +387,7 @@ Status PartitionedAggLocalState::_recover_blocks_from_partition(RuntimeState* st return Status::OK(); } -Status PartitionedAggLocalState::setup_in_memory_agg_op(RuntimeState* state) { +Status PartitionedAggLocalState::_setup_in_memory_agg_op(RuntimeState* state) { _runtime_state = RuntimeState::create_unique( state->fragment_instance_id(), state->query_id(), state->fragment_id(), state->query_options(), TQueryGlobals {}, state->exec_env(), state->get_query_ctx()); @@ -416,7 +416,7 @@ Status PartitionedAggLocalState::setup_in_memory_agg_op(RuntimeState* state) { return source_local_state->open(state); } -Status PartitionedAggLocalState::flush_hash_table_to_sub_spill_files(RuntimeState* state) { +Status PartitionedAggLocalState::_flush_hash_table_to_sub_spill_files(RuntimeState* state) { auto* runtime_state = _runtime_state.get(); auto& p = _parent->cast<PartitionedAggSourceOperatorX>(); auto* in_mem_state = _shared_state->_in_mem_shared_state; @@ -439,7 +439,7 @@ Status PartitionedAggLocalState::flush_hash_table_to_sub_spill_files(RuntimeStat return Status::OK(); } -Status PartitionedAggLocalState::flush_and_repartition(RuntimeState* state) { +Status PartitionedAggLocalState::_flush_and_repartition(RuntimeState* state) { auto& p = _parent->cast<PartitionedAggSourceOperatorX>(); const int new_level = _current_partition.level + 1; @@ -459,7 +459,7 @@ Status PartitionedAggLocalState::flush_and_repartition(RuntimeState* state) { { auto* source_local_state = _runtime_state->get_local_state(p._agg_source_operator->operator_id()); - update_profile<true>(source_local_state->custom_profile()); + _update_profile<true>(source_local_state->custom_profile()); } // 1. Create FANOUT output sub-spill-files. @@ -485,7 +485,7 @@ Status PartitionedAggLocalState::flush_and_repartition(RuntimeState* state) { RETURN_IF_ERROR(_repartitioner.setup_output(state, output_spill_files)); // 2. Flush the in-memory hash table into the sub-spill-files. - RETURN_IF_ERROR(flush_hash_table_to_sub_spill_files(state)); + RETURN_IF_ERROR(_flush_hash_table_to_sub_spill_files(state)); // 3. Route any in-memory blocks that were recovered but not yet merged // into the hash table. These blocks were already read from the file diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h index 518e59b5ac5..54f9aad3f8d 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h @@ -62,25 +62,25 @@ public: Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; - Status setup_in_memory_agg_op(RuntimeState* state); + bool is_blockable() const override; - template <bool spilled> - void update_profile(RuntimeProfile* child_profile); +private: + friend class PartitionedAggSourceOperatorX; - bool is_blockable() const override; + Status _setup_in_memory_agg_op(RuntimeState* state); + + template <bool spilled> + void _update_profile(RuntimeProfile* child_profile); /// Flush the current in-memory hash table by draining it as blocks and routing /// each block through the repartitioner into the output sub-spill-files. - Status flush_hash_table_to_sub_spill_files(RuntimeState* state); + Status _flush_hash_table_to_sub_spill_files(RuntimeState* state); /// Flush the in-memory hash table into FANOUT sub-spill-files, repartition remaining /// unread spill files from `remaining_spill_files`, and push resulting sub-partitions into /// `_partition_queue`. After this call the hash table is reset and /// `remaining_spill_files` is cleared. - Status flush_and_repartition(RuntimeState* state); - -private: - friend class PartitionedAggSourceOperatorX; + Status _flush_and_repartition(RuntimeState* state); /// Move all original spill_partitions from shared state into `_partition_queue`. /// Called once when spilled get_block is first entered. @@ -106,7 +106,7 @@ private: std::vector<vectorized::Block> _blocks; // Estimated in-memory hash table size for the current partition. - //size_t _estimate_memory_usage = 0; + size_t _estimate_memory_usage = 0; // Counters to track spill partition metrics RuntimeProfile::Counter* _max_partition_level = nullptr; diff --git a/be/test/vec/spill/spill_file_test.cpp b/be/test/vec/spill/spill_file_test.cpp new file mode 100644 index 00000000000..2313ed53614 --- /dev/null +++ b/be/test/vec/spill/spill_file_test.cpp @@ -0,0 +1,1040 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/spill/spill_file.h" + +#include <gtest/gtest.h> + +#include <algorithm> +#include <memory> +#include <numeric> +#include <vector> + +#include "common/config.h" +#include "io/fs/local_file_system.h" +#include "runtime/exec_env.h" +#include "testutil/column_helper.h" +#include "testutil/mock/mock_runtime_state.h" +#include "util/runtime_profile.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type_number.h" +#include "vec/data_types/data_type_string.h" +#include "vec/spill/spill_file_manager.h" +#include "vec/spill/spill_file_reader.h" +#include "vec/spill/spill_file_writer.h" + +namespace doris::vectorized { + +class SpillFileTest : public testing::Test { +protected: + void SetUp() override { + _runtime_state = std::make_unique<MockRuntimeState>(); + + _profile = std::make_unique<RuntimeProfile>("test"); + _custom_profile = std::make_unique<RuntimeProfile>("CustomCounters"); + _common_profile = std::make_unique<RuntimeProfile>("CommonCounters"); + + _common_profile->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES, "", 1); + ADD_TIMER_WITH_LEVEL(_common_profile.get(), "ExecTime", 1); + + ADD_TIMER_WITH_LEVEL(_custom_profile.get(), "SpillTotalTime", 1); + ADD_TIMER_WITH_LEVEL(_custom_profile.get(), "SpillWriteTime", 1); + ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteTaskWaitInQueueCount", TUnit::UNIT, + 1); + ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteTaskCount", TUnit::UNIT, 1); + ADD_TIMER_WITH_LEVEL(_custom_profile.get(), "SpillWriteTaskWaitInQueueTime", 1); + ADD_TIMER_WITH_LEVEL(_custom_profile.get(), "SpillWriteFileTime", 1); + ADD_TIMER_WITH_LEVEL(_custom_profile.get(), "SpillWriteSerializeBlockTime", 1); + ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteBlockCount", TUnit::UNIT, 1); + ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteBlockBytes", TUnit::BYTES, 1); + ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteFileBytes", TUnit::BYTES, 1); + ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteRows", TUnit::UNIT, 1); + ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadFileTime", TUnit::UNIT, 1); + ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadDerializeBlockTime", TUnit::UNIT, + 1); + ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadBlockCount", TUnit::UNIT, 1); + ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadBlockBytes", TUnit::UNIT, 1); + ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadFileBytes", TUnit::UNIT, 1); + ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadRows", TUnit::UNIT, 1); + ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadFileCount", TUnit::UNIT, 1); + ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteFileTotalCount", TUnit::UNIT, 1); + ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteFileCurrentCount", TUnit::UNIT, 1); + ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteFileCurrentBytes", TUnit::UNIT, 1); + + _profile->add_child(_custom_profile.get(), true); + _profile->add_child(_common_profile.get(), true); + + _spill_dir = "./ut_dir/spill_file_test"; + auto spill_data_dir = std::make_unique<SpillDataDir>(_spill_dir, 1024L * 1024 * 128); + auto st = io::global_local_filesystem()->create_directory(spill_data_dir->path(), false); + ASSERT_TRUE(st.ok()) << "create directory failed: " << st.to_string(); + + std::unordered_map<std::string, std::unique_ptr<SpillDataDir>> data_map; + _data_dir_ptr = spill_data_dir.get(); + data_map.emplace("test", std::move(spill_data_dir)); + auto* spill_file_manager = new SpillFileManager(std::move(data_map)); + ExecEnv::GetInstance()->_spill_file_mgr = spill_file_manager; + st = spill_file_manager->init(); + ASSERT_TRUE(st.ok()) << "init spill file manager failed: " << st.to_string(); + } + + void TearDown() override { + ExecEnv::GetInstance()->spill_file_mgr()->stop(); + SAFE_DELETE(ExecEnv::GetInstance()->_spill_file_mgr); + // Clean up test directory + auto st = io::global_local_filesystem()->delete_directory(_spill_dir); + (void)st; + _runtime_state.reset(); + } + + Block _create_int_block(const std::vector<int32_t>& data) { + return ColumnHelper::create_block<DataTypeInt32>(data); + } + + Block _create_two_column_block(const std::vector<int32_t>& col1, + const std::vector<int64_t>& col2) { + auto block = ColumnHelper::create_block<DataTypeInt32>(col1); + block.insert(ColumnHelper::create_column_with_name<DataTypeInt64>(col2)); + return block; + } + + std::unique_ptr<MockRuntimeState> _runtime_state; + std::unique_ptr<RuntimeProfile> _profile; + std::unique_ptr<RuntimeProfile> _custom_profile; + std::unique_ptr<RuntimeProfile> _common_profile; + std::string _spill_dir; + SpillDataDir* _data_dir_ptr = nullptr; +}; + +// ═══════════════════════════════════════════════════════════════════════ +// SpillFile basic tests +// ═══════════════════════════════════════════════════════════════════════ + +TEST_F(SpillFileTest, CreateSpillFile) { + SpillFileSPtr spill_file; + auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/test_file", + spill_file); + ASSERT_TRUE(st.ok()) << st.to_string(); + ASSERT_TRUE(spill_file != nullptr); + ASSERT_FALSE(spill_file->ready_for_reading()); +} + +TEST_F(SpillFileTest, CreateWriterAndReader) { + SpillFileSPtr spill_file; + auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/create_wr", + spill_file); + ASSERT_TRUE(st.ok()) << st.to_string(); + + // Create writer + SpillFileWriterSPtr writer; + st = spill_file->create_writer(_runtime_state.get(), _profile.get(), writer); + ASSERT_TRUE(st.ok()) << st.to_string(); + ASSERT_TRUE(writer != nullptr); + + // Close writer with no data written + st = writer->close(); + ASSERT_TRUE(st.ok()) << st.to_string(); + ASSERT_TRUE(spill_file->ready_for_reading()); + + // Create reader on empty file (0 parts) + auto reader = spill_file->create_reader(_runtime_state.get(), _profile.get()); + ASSERT_TRUE(reader != nullptr); + + st = reader->open(); + ASSERT_TRUE(st.ok()) << st.to_string(); + + Block block; + bool eos = false; + st = reader->read(&block, &eos); + ASSERT_TRUE(st.ok()) << st.to_string(); + ASSERT_TRUE(eos); + + st = reader->close(); + ASSERT_TRUE(st.ok()) << st.to_string(); +} + +// ═══════════════════════════════════════════════════════════════════════ +// SpillFileWriter tests +// ═══════════════════════════════════════════════════════════════════════ + +TEST_F(SpillFileTest, WriteSingleBlock) { + SpillFileSPtr spill_file; + auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/single_block", + spill_file); + ASSERT_TRUE(st.ok()); + + SpillFileWriterSPtr writer; + st = spill_file->create_writer(_runtime_state.get(), _profile.get(), writer); + ASSERT_TRUE(st.ok()); + + auto block = _create_int_block({1, 2, 3, 4, 5}); + st = writer->write_block(_runtime_state.get(), block); + ASSERT_TRUE(st.ok()) << st.to_string(); + + st = writer->close(); + ASSERT_TRUE(st.ok()) << st.to_string(); + + ASSERT_TRUE(spill_file->ready_for_reading()); + + auto* write_rows_counter = _custom_profile->get_counter("SpillWriteRows"); + ASSERT_TRUE(write_rows_counter != nullptr); + ASSERT_EQ(write_rows_counter->value(), 5); +} + +TEST_F(SpillFileTest, WriteMultipleBlocks) { + SpillFileSPtr spill_file; + auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/multi_blocks", + spill_file); + ASSERT_TRUE(st.ok()); + + SpillFileWriterSPtr writer; + st = spill_file->create_writer(_runtime_state.get(), _profile.get(), writer); + ASSERT_TRUE(st.ok()); + + for (int i = 0; i < 5; ++i) { + auto block = _create_int_block({i * 10, i * 10 + 1, i * 10 + 2}); + st = writer->write_block(_runtime_state.get(), block); + ASSERT_TRUE(st.ok()) << "write block " << i << " failed: " << st.to_string(); + } + + st = writer->close(); + ASSERT_TRUE(st.ok()) << st.to_string(); + + auto* write_rows_counter = _custom_profile->get_counter("SpillWriteRows"); + ASSERT_EQ(write_rows_counter->value(), 15); + + auto* write_block_counter = _custom_profile->get_counter("SpillWriteBlockCount"); + ASSERT_EQ(write_block_counter->value(), 5); +} + +TEST_F(SpillFileTest, WriteTwoColumnBlock) { + SpillFileSPtr spill_file; + auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/two_col", + spill_file); + ASSERT_TRUE(st.ok()); + + SpillFileWriterSPtr writer; + st = spill_file->create_writer(_runtime_state.get(), _profile.get(), writer); + ASSERT_TRUE(st.ok()); + + auto block = _create_two_column_block({1, 2, 3}, {100, 200, 300}); + st = writer->write_block(_runtime_state.get(), block); + ASSERT_TRUE(st.ok()) << st.to_string(); + + st = writer->close(); + ASSERT_TRUE(st.ok()) << st.to_string(); + ASSERT_TRUE(spill_file->ready_for_reading()); +} + +TEST_F(SpillFileTest, WriteEmptyBlock) { + SpillFileSPtr spill_file; + auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/empty_block", + spill_file); + ASSERT_TRUE(st.ok()); + + SpillFileWriterSPtr writer; + st = spill_file->create_writer(_runtime_state.get(), _profile.get(), writer); + ASSERT_TRUE(st.ok()); + + Block empty_block; + st = writer->write_block(_runtime_state.get(), empty_block); + ASSERT_TRUE(st.ok()) << st.to_string(); + + st = writer->close(); + ASSERT_TRUE(st.ok()) << st.to_string(); +} + +TEST_F(SpillFileTest, DoubleCloseWriter) { + SpillFileSPtr spill_file; + auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/double_close", + spill_file); + ASSERT_TRUE(st.ok()); + + SpillFileWriterSPtr writer; + st = spill_file->create_writer(_runtime_state.get(), _profile.get(), writer); + ASSERT_TRUE(st.ok()); + + auto block = _create_int_block({1, 2, 3}); + st = writer->write_block(_runtime_state.get(), block); + ASSERT_TRUE(st.ok()); + + st = writer->close(); + ASSERT_TRUE(st.ok()); + + // Double close should be a no-op + st = writer->close(); + ASSERT_TRUE(st.ok()); +} + +// ═══════════════════════════════════════════════════════════════════════ +// SpillFileReader tests +// ═══════════════════════════════════════════════════════════════════════ + +TEST_F(SpillFileTest, ReadSingleBlock) { + SpillFileSPtr spill_file; + auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/read_single", + spill_file); + ASSERT_TRUE(st.ok()); + + // Write + { + SpillFileWriterSPtr writer; + st = spill_file->create_writer(_runtime_state.get(), _profile.get(), writer); + ASSERT_TRUE(st.ok()); + + auto block = _create_int_block({10, 20, 30, 40, 50}); + st = writer->write_block(_runtime_state.get(), block); + ASSERT_TRUE(st.ok()); + + st = writer->close(); + ASSERT_TRUE(st.ok()); + } + + // Read + auto reader = spill_file->create_reader(_runtime_state.get(), _profile.get()); + st = reader->open(); + ASSERT_TRUE(st.ok()) << st.to_string(); + + Block block; + bool eos = false; + st = reader->read(&block, &eos); + ASSERT_TRUE(st.ok()) << st.to_string(); + ASSERT_FALSE(eos); + ASSERT_EQ(block.rows(), 5); + + // Verify data + auto col = block.get_by_position(0).column; + ASSERT_EQ(col->get_int(0), 10); + ASSERT_EQ(col->get_int(1), 20); + ASSERT_EQ(col->get_int(2), 30); + ASSERT_EQ(col->get_int(3), 40); + ASSERT_EQ(col->get_int(4), 50); + + // Next read should be EOS + Block block2; + st = reader->read(&block2, &eos); + ASSERT_TRUE(st.ok()); + ASSERT_TRUE(eos); + + st = reader->close(); + ASSERT_TRUE(st.ok()); +} + +TEST_F(SpillFileTest, ReadMultipleBlocks) { + SpillFileSPtr spill_file; + auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/read_multi", + spill_file); + ASSERT_TRUE(st.ok()); + + const int num_blocks = 10; + const int rows_per_block = 100; + + // Write + { + SpillFileWriterSPtr writer; + st = spill_file->create_writer(_runtime_state.get(), _profile.get(), writer); + ASSERT_TRUE(st.ok()); + + for (int b = 0; b < num_blocks; ++b) { + std::vector<int32_t> data(rows_per_block); + std::iota(data.begin(), data.end(), b * rows_per_block); + auto block = _create_int_block(data); + st = writer->write_block(_runtime_state.get(), block); + ASSERT_TRUE(st.ok()); + } + + st = writer->close(); + ASSERT_TRUE(st.ok()); + } + + // Read all blocks + auto reader = spill_file->create_reader(_runtime_state.get(), _profile.get()); + st = reader->open(); + ASSERT_TRUE(st.ok()); + + size_t total_rows = 0; + int block_count = 0; + bool eos = false; + while (!eos) { + Block block; + st = reader->read(&block, &eos); + ASSERT_TRUE(st.ok()) << st.to_string(); + if (!eos) { + total_rows += block.rows(); + ++block_count; + } + } + + ASSERT_EQ(total_rows, num_blocks * rows_per_block); + ASSERT_EQ(block_count, num_blocks); + + st = reader->close(); + ASSERT_TRUE(st.ok()); +} + +TEST_F(SpillFileTest, ReadTwoColumnBlock) { + SpillFileSPtr spill_file; + auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/read_two_col", + spill_file); + ASSERT_TRUE(st.ok()); + + // Write + { + SpillFileWriterSPtr writer; + st = spill_file->create_writer(_runtime_state.get(), _profile.get(), writer); + ASSERT_TRUE(st.ok()); + + auto block = _create_two_column_block({1, 2, 3, 4}, {100, 200, 300, 400}); + st = writer->write_block(_runtime_state.get(), block); + ASSERT_TRUE(st.ok()); + + st = writer->close(); + ASSERT_TRUE(st.ok()); + } + + // Read + auto reader = spill_file->create_reader(_runtime_state.get(), _profile.get()); + st = reader->open(); + ASSERT_TRUE(st.ok()); + + Block block; + bool eos = false; + st = reader->read(&block, &eos); + ASSERT_TRUE(st.ok()); + ASSERT_FALSE(eos); + ASSERT_EQ(block.rows(), 4); + ASSERT_EQ(block.columns(), 2); + + // Verify col1 + auto col1 = block.get_by_position(0).column; + ASSERT_EQ(col1->get_int(0), 1); + ASSERT_EQ(col1->get_int(3), 4); + + // Verify col2 + auto col2 = block.get_by_position(1).column; + ASSERT_EQ(col2->get_int(0), 100); + ASSERT_EQ(col2->get_int(3), 400); + + st = reader->close(); + ASSERT_TRUE(st.ok()); +} + +// ═══════════════════════════════════════════════════════════════════════ +// Roundtrip tests (write -> read -> verify) +// ═══════════════════════════════════════════════════════════════════════ + +TEST_F(SpillFileTest, RoundtripSingleBlock) { + SpillFileSPtr spill_file; + auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file( + "test_query/roundtrip_single", spill_file); + ASSERT_TRUE(st.ok()); + + std::vector<int32_t> original_data = {42, 7, 99, 1, 0, -5, 1000}; + + // Write + { + SpillFileWriterSPtr writer; + st = spill_file->create_writer(_runtime_state.get(), _profile.get(), writer); + ASSERT_TRUE(st.ok()); + + auto block = _create_int_block(original_data); + st = writer->write_block(_runtime_state.get(), block); + ASSERT_TRUE(st.ok()); + + st = writer->close(); + ASSERT_TRUE(st.ok()); + } + + // Read & verify + auto reader = spill_file->create_reader(_runtime_state.get(), _profile.get()); + st = reader->open(); + ASSERT_TRUE(st.ok()); + + Block block; + bool eos = false; + st = reader->read(&block, &eos); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(block.rows(), original_data.size()); + + auto col = block.get_by_position(0).column; + for (size_t i = 0; i < original_data.size(); ++i) { + ASSERT_EQ(col->get_int(i), original_data[i]) << "mismatch at index " << i; + } + + st = reader->close(); + ASSERT_TRUE(st.ok()); +} + +TEST_F(SpillFileTest, RoundtripMultipleBlocks) { + SpillFileSPtr spill_file; + auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file( + "test_query/roundtrip_multi", spill_file); + ASSERT_TRUE(st.ok()); + + std::vector<std::vector<int32_t>> all_data = { + {1, 2, 3}, + {10, 20, 30, 40}, + {100, 200}, + {-1, -2, -3, -4, -5}, + }; + + // Write + { + SpillFileWriterSPtr writer; + st = spill_file->create_writer(_runtime_state.get(), _profile.get(), writer); + ASSERT_TRUE(st.ok()); + + for (const auto& data : all_data) { + auto block = _create_int_block(data); + st = writer->write_block(_runtime_state.get(), block); + ASSERT_TRUE(st.ok()); + } + + st = writer->close(); + ASSERT_TRUE(st.ok()); + } + + // Read & verify + auto reader = spill_file->create_reader(_runtime_state.get(), _profile.get()); + st = reader->open(); + ASSERT_TRUE(st.ok()); + + size_t block_idx = 0; + bool eos = false; + while (!eos && block_idx < all_data.size()) { + Block block; + st = reader->read(&block, &eos); + ASSERT_TRUE(st.ok()); + if (eos) break; + + ASSERT_EQ(block.rows(), all_data[block_idx].size()) + << "block " << block_idx << " row count mismatch"; + + auto col = block.get_by_position(0).column; + for (size_t i = 0; i < all_data[block_idx].size(); ++i) { + ASSERT_EQ(col->get_int(i), all_data[block_idx][i]) + << "mismatch at block " << block_idx << " row " << i; + } + ++block_idx; + } + + ASSERT_EQ(block_idx, all_data.size()); + + st = reader->close(); + ASSERT_TRUE(st.ok()); +} + +TEST_F(SpillFileTest, RoundtripLargeData) { + SpillFileSPtr spill_file; + auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file( + "test_query/roundtrip_large", spill_file); + ASSERT_TRUE(st.ok()); + + const size_t row_count = 100000; + std::vector<int32_t> data(row_count); + std::iota(data.begin(), data.end(), 0); + + // Write + { + SpillFileWriterSPtr writer; + st = spill_file->create_writer(_runtime_state.get(), _profile.get(), writer); + ASSERT_TRUE(st.ok()); + + auto block = _create_int_block(data); + st = writer->write_block(_runtime_state.get(), block); + ASSERT_TRUE(st.ok()); + + st = writer->close(); + ASSERT_TRUE(st.ok()); + } + + // Read & verify + auto reader = spill_file->create_reader(_runtime_state.get(), _profile.get()); + st = reader->open(); + ASSERT_TRUE(st.ok()); + + Block block; + bool eos = false; + st = reader->read(&block, &eos); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(block.rows(), row_count); + + auto col = block.get_by_position(0).column; + for (size_t i = 0; i < row_count; i += 1000) { + ASSERT_EQ(col->get_int(i), (int32_t)i) << "mismatch at index " << i; + } + + st = reader->close(); + ASSERT_TRUE(st.ok()); +} + +// ═══════════════════════════════════════════════════════════════════════ +// Part rotation tests +// ═══════════════════════════════════════════════════════════════════════ + +TEST_F(SpillFileTest, PartRotation) { + // Set a very small part size to force rotation + auto saved_part_size = config::spill_file_part_size_bytes; + config::spill_file_part_size_bytes = 1024; // 1KB per part + + SpillFileSPtr spill_file; + auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/rotation", + spill_file); + ASSERT_TRUE(st.ok()); + + const int num_blocks = 20; + + // Write many blocks to trigger multiple part rotations + { + SpillFileWriterSPtr writer; + st = spill_file->create_writer(_runtime_state.get(), _profile.get(), writer); + ASSERT_TRUE(st.ok()); + + for (int i = 0; i < num_blocks; ++i) { + std::vector<int32_t> data(100); + std::iota(data.begin(), data.end(), i * 100); + auto block = _create_int_block(data); + st = writer->write_block(_runtime_state.get(), block); + ASSERT_TRUE(st.ok()); + } + + st = writer->close(); + ASSERT_TRUE(st.ok()); + } + + // Read back and verify all data across multiple parts + auto reader = spill_file->create_reader(_runtime_state.get(), _profile.get()); + st = reader->open(); + ASSERT_TRUE(st.ok()); + + size_t total_rows = 0; + int block_count = 0; + bool eos = false; + while (!eos) { + Block block; + st = reader->read(&block, &eos); + ASSERT_TRUE(st.ok()); + if (!eos) { + total_rows += block.rows(); + ++block_count; + } + } + + ASSERT_EQ(total_rows, num_blocks * 100); + ASSERT_EQ(block_count, num_blocks); + + st = reader->close(); + ASSERT_TRUE(st.ok()); + + config::spill_file_part_size_bytes = saved_part_size; +} + +TEST_F(SpillFileTest, PartRotationDataIntegrity) { + // Set a small part size to force rotation + auto saved_part_size = config::spill_file_part_size_bytes; + config::spill_file_part_size_bytes = 512; + + SpillFileSPtr spill_file; + auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file( + "test_query/rotation_integrity", spill_file); + ASSERT_TRUE(st.ok()); + + std::vector<std::vector<int32_t>> all_data; + const int num_blocks = 30; + + // Write + { + SpillFileWriterSPtr writer; + st = spill_file->create_writer(_runtime_state.get(), _profile.get(), writer); + ASSERT_TRUE(st.ok()); + + for (int i = 0; i < num_blocks; ++i) { + std::vector<int32_t> data(50); + std::iota(data.begin(), data.end(), i * 1000); + all_data.push_back(data); + auto block = _create_int_block(data); + st = writer->write_block(_runtime_state.get(), block); + ASSERT_TRUE(st.ok()); + } + + st = writer->close(); + ASSERT_TRUE(st.ok()); + } + + // Read & verify data integrity across parts + auto reader = spill_file->create_reader(_runtime_state.get(), _profile.get()); + st = reader->open(); + ASSERT_TRUE(st.ok()); + + size_t block_idx = 0; + bool eos = false; + while (!eos) { + Block block; + st = reader->read(&block, &eos); + ASSERT_TRUE(st.ok()); + if (eos) break; + + ASSERT_LT(block_idx, all_data.size()); + ASSERT_EQ(block.rows(), all_data[block_idx].size()); + + auto col = block.get_by_position(0).column; + for (size_t i = 0; i < all_data[block_idx].size(); ++i) { + ASSERT_EQ(col->get_int(i), all_data[block_idx][i]) + << "data mismatch at block " << block_idx << " row " << i; + } + ++block_idx; + } + + ASSERT_EQ(block_idx, all_data.size()); + + st = reader->close(); + ASSERT_TRUE(st.ok()); + + config::spill_file_part_size_bytes = saved_part_size; +} + +// ═══════════════════════════════════════════════════════════════════════ +// Seek tests +// ═══════════════════════════════════════════════════════════════════════ + +TEST_F(SpillFileTest, SeekToBlock) { + SpillFileSPtr spill_file; + auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/seek", + spill_file); + ASSERT_TRUE(st.ok()); + + const int num_blocks = 5; + + // Write + { + SpillFileWriterSPtr writer; + st = spill_file->create_writer(_runtime_state.get(), _profile.get(), writer); + ASSERT_TRUE(st.ok()); + + for (int i = 0; i < num_blocks; ++i) { + auto block = _create_int_block({i * 10, i * 10 + 1, i * 10 + 2}); + st = writer->write_block(_runtime_state.get(), block); + ASSERT_TRUE(st.ok()); + } + + st = writer->close(); + ASSERT_TRUE(st.ok()); + } + + // Seek to block 2 (0-based) and read + auto reader = spill_file->create_reader(_runtime_state.get(), _profile.get()); + st = reader->open(); + ASSERT_TRUE(st.ok()); + + reader->seek(2); + + Block block; + bool eos = false; + st = reader->read(&block, &eos); + ASSERT_TRUE(st.ok()); + ASSERT_FALSE(eos); + ASSERT_EQ(block.rows(), 3); + + auto col = block.get_by_position(0).column; + ASSERT_EQ(col->get_int(0), 20); + ASSERT_EQ(col->get_int(1), 21); + ASSERT_EQ(col->get_int(2), 22); + + st = reader->close(); + ASSERT_TRUE(st.ok()); +} + +TEST_F(SpillFileTest, SeekBeyondEnd) { + SpillFileSPtr spill_file; + auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/seek_beyond", + spill_file); + ASSERT_TRUE(st.ok()); + + // Write 3 blocks + { + SpillFileWriterSPtr writer; + st = spill_file->create_writer(_runtime_state.get(), _profile.get(), writer); + ASSERT_TRUE(st.ok()); + + for (int i = 0; i < 3; ++i) { + auto block = _create_int_block({i}); + st = writer->write_block(_runtime_state.get(), block); + ASSERT_TRUE(st.ok()); + } + + st = writer->close(); + ASSERT_TRUE(st.ok()); + } + + auto reader = spill_file->create_reader(_runtime_state.get(), _profile.get()); + st = reader->open(); + ASSERT_TRUE(st.ok()); + + // Seek beyond the end + reader->seek(100); + + Block block; + bool eos = false; + st = reader->read(&block, &eos); + ASSERT_TRUE(st.ok()); + ASSERT_TRUE(eos); + + st = reader->close(); + ASSERT_TRUE(st.ok()); +} + +// ═══════════════════════════════════════════════════════════════════════ +// SpillFile GC/lifecycle tests +// ═══════════════════════════════════════════════════════════════════════ + +TEST_F(SpillFileTest, GCCleansUpFiles) { + std::string spill_file_dir; + + { + SpillFileSPtr spill_file; + auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/gc_test", + spill_file); + ASSERT_TRUE(st.ok()); + + SpillFileWriterSPtr writer; + st = spill_file->create_writer(_runtime_state.get(), _profile.get(), writer); + ASSERT_TRUE(st.ok()); + + auto block = _create_int_block({1, 2, 3}); + st = writer->write_block(_runtime_state.get(), block); + ASSERT_TRUE(st.ok()); + + st = writer->close(); + ASSERT_TRUE(st.ok()); + + // Remember the spill directory path + spill_file_dir = _data_dir_ptr->get_spill_data_path() + "/test_query/gc_test"; + + // Verify directory exists + bool exists = false; + st = io::global_local_filesystem()->exists(spill_file_dir, &exists); + ASSERT_TRUE(st.ok()); + ASSERT_TRUE(exists); + + // spill_file goes out of scope here, destructor calls gc() + } + + // After SpillFile is destroyed, the directory should be cleaned up + bool exists = false; + auto st = io::global_local_filesystem()->exists(spill_file_dir, &exists); + ASSERT_TRUE(st.ok()); + ASSERT_FALSE(exists); +} + +TEST_F(SpillFileTest, DeleteSpillFileThroughManager) { + SpillFileSPtr spill_file; + auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/mgr_delete", + spill_file); + ASSERT_TRUE(st.ok()); + + SpillFileWriterSPtr writer; + st = spill_file->create_writer(_runtime_state.get(), _profile.get(), writer); + ASSERT_TRUE(st.ok()); + + auto block = _create_int_block({1, 2, 3}); + st = writer->write_block(_runtime_state.get(), block); + ASSERT_TRUE(st.ok()); + + st = writer->close(); + ASSERT_TRUE(st.ok()); + + // Delete through manager (async GC) + ExecEnv::GetInstance()->spill_file_mgr()->delete_spill_file(spill_file); + + // Run GC to process the deletion + ExecEnv::GetInstance()->spill_file_mgr()->gc(1000); +} + +// ═══════════════════════════════════════════════════════════════════════ +// SpillFileManager tests +// ═══════════════════════════════════════════════════════════════════════ + +TEST_F(SpillFileTest, ManagerNextId) { + auto id1 = ExecEnv::GetInstance()->spill_file_mgr()->next_id(); + auto id2 = ExecEnv::GetInstance()->spill_file_mgr()->next_id(); + auto id3 = ExecEnv::GetInstance()->spill_file_mgr()->next_id(); + + ASSERT_EQ(id2, id1 + 1); + ASSERT_EQ(id3, id2 + 1); +} + +TEST_F(SpillFileTest, ManagerCreateMultipleFiles) { + const int num_files = 5; + std::vector<SpillFileSPtr> files; + + for (int i = 0; i < num_files; ++i) { + SpillFileSPtr spill_file; + auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file( + fmt::format("test_query/multi_{}", i), spill_file); + ASSERT_TRUE(st.ok()) << "create file " << i << " failed: " << st.to_string(); + files.push_back(spill_file); + } + + // Write and close each file + for (int i = 0; i < num_files; ++i) { + SpillFileWriterSPtr writer; + auto st = files[i]->create_writer(_runtime_state.get(), _profile.get(), writer); + ASSERT_TRUE(st.ok()); + + auto block = _create_int_block({i * 100}); + st = writer->write_block(_runtime_state.get(), block); + ASSERT_TRUE(st.ok()); + + st = writer->close(); + ASSERT_TRUE(st.ok()); + } + + // Read each file and verify + for (int i = 0; i < num_files; ++i) { + auto reader = files[i]->create_reader(_runtime_state.get(), _profile.get()); + auto st = reader->open(); + ASSERT_TRUE(st.ok()); + + Block block; + bool eos = false; + st = reader->read(&block, &eos); + ASSERT_TRUE(st.ok()); + ASSERT_EQ(block.rows(), 1); + + auto col = block.get_by_position(0).column; + ASSERT_EQ(col->get_int(0), i * 100); + + st = reader->close(); + ASSERT_TRUE(st.ok()); + } +} + +// ═══════════════════════════════════════════════════════════════════════ +// Profile counter tests +// ═══════════════════════════════════════════════════════════════════════ + +TEST_F(SpillFileTest, WriteCounters) { + SpillFileSPtr spill_file; + auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/counters", + spill_file); + ASSERT_TRUE(st.ok()); + + SpillFileWriterSPtr writer; + st = spill_file->create_writer(_runtime_state.get(), _profile.get(), writer); + ASSERT_TRUE(st.ok()); + + auto block = _create_int_block({1, 2, 3, 4, 5}); + st = writer->write_block(_runtime_state.get(), block); + ASSERT_TRUE(st.ok()); + + auto block2 = _create_int_block({10, 20, 30}); + st = writer->write_block(_runtime_state.get(), block2); + ASSERT_TRUE(st.ok()); + + st = writer->close(); + ASSERT_TRUE(st.ok()); + + auto* write_rows = _custom_profile->get_counter("SpillWriteRows"); + ASSERT_TRUE(write_rows != nullptr); + ASSERT_EQ(write_rows->value(), 8); + + auto* write_blocks = _custom_profile->get_counter("SpillWriteBlockCount"); + ASSERT_TRUE(write_blocks != nullptr); + ASSERT_EQ(write_blocks->value(), 2); + + auto* write_bytes = _custom_profile->get_counter("SpillWriteFileBytes"); + ASSERT_TRUE(write_bytes != nullptr); + ASSERT_GT(write_bytes->value(), 0); +} + +TEST_F(SpillFileTest, ReadCounters) { + SpillFileSPtr spill_file; + auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file( + "test_query/read_counters", spill_file); + ASSERT_TRUE(st.ok()); + + // Write + { + SpillFileWriterSPtr writer; + st = spill_file->create_writer(_runtime_state.get(), _profile.get(), writer); + ASSERT_TRUE(st.ok()); + + auto block = _create_int_block({1, 2, 3, 4, 5}); + st = writer->write_block(_runtime_state.get(), block); + ASSERT_TRUE(st.ok()); + + st = writer->close(); + ASSERT_TRUE(st.ok()); + } + + // Read + auto reader = spill_file->create_reader(_runtime_state.get(), _profile.get()); + st = reader->open(); + ASSERT_TRUE(st.ok()); + + Block block; + bool eos = false; + st = reader->read(&block, &eos); + ASSERT_TRUE(st.ok()); + + st = reader->close(); + ASSERT_TRUE(st.ok()); + + auto* read_blocks = _custom_profile->get_counter("SpillReadBlockCount"); + ASSERT_TRUE(read_blocks != nullptr); + ASSERT_EQ(read_blocks->value(), 1); + + auto* read_rows = _custom_profile->get_counter("SpillReadRows"); + ASSERT_TRUE(read_rows != nullptr); + ASSERT_EQ(read_rows->value(), 5); + + auto* read_file_size = _custom_profile->get_counter("SpillReadFileBytes"); + ASSERT_TRUE(read_file_size != nullptr); + ASSERT_GT(read_file_size->value(), 0); +} + +// ═══════════════════════════════════════════════════════════════════════ +// SpillDataDir tests +// ═══════════════════════════════════════════════════════════════════════ + +TEST_F(SpillFileTest, DataDirCapacityTracking) { + SpillFileSPtr spill_file; + auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/capacity", + spill_file); + ASSERT_TRUE(st.ok()); + + auto initial_bytes = _data_dir_ptr->get_spill_data_bytes(); + + SpillFileWriterSPtr writer; + st = spill_file->create_writer(_runtime_state.get(), _profile.get(), writer); + ASSERT_TRUE(st.ok()); + + // Write a block to increase usage + std::vector<int32_t> data(1000); + std::iota(data.begin(), data.end(), 0); + auto block = _create_int_block(data); + st = writer->write_block(_runtime_state.get(), block); + ASSERT_TRUE(st.ok()); + + st = writer->close(); + ASSERT_TRUE(st.ok()); + + auto after_write_bytes = _data_dir_ptr->get_spill_data_bytes(); + ASSERT_GT(after_write_bytes, initial_bytes); +} + +} // namespace doris::vectorized --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
