This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_repartition
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_repartition by this push:
new 11b62957f61 refactor code and add unit test
11b62957f61 is described below
commit 11b62957f61e6239f8548b9de83698460f623587
Author: yiguolei <[email protected]>
AuthorDate: Wed Mar 4 15:11:56 2026 +0800
refactor code and add unit test
---
.../exec/partitioned_aggregation_sink_operator.cpp | 89 +-
.../exec/partitioned_aggregation_sink_operator.h | 33 +-
.../partitioned_aggregation_source_operator.cpp | 20 +-
.../exec/partitioned_aggregation_source_operator.h | 20 +-
be/test/vec/spill/spill_file_test.cpp | 1040 ++++++++++++++++++++
5 files changed, 1122 insertions(+), 80 deletions(-)
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index fd785f78335..c2e36dba2b8 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -51,15 +51,15 @@ Status
PartitionedAggSinkLocalState::init(doris::RuntimeState* state,
auto& parent = Base::_parent->template cast<Parent>();
_spill_writers.resize(parent._partition_count);
- RETURN_IF_ERROR(setup_in_memory_agg_op(state));
+ RETURN_IF_ERROR(_setup_in_memory_agg_op(state));
for (const auto& probe_expr_ctx :
Base::_shared_state->_in_mem_shared_state->probe_expr_ctxs) {
-
key_columns_.emplace_back(probe_expr_ctx->root()->data_type()->create_column());
+
_key_columns.emplace_back(probe_expr_ctx->root()->data_type()->create_column());
}
for (const auto& aggregate_evaluator :
Base::_shared_state->_in_mem_shared_state->aggregate_evaluators) {
-
value_data_types_.emplace_back(aggregate_evaluator->function()->get_serialized_type());
-
value_columns_.emplace_back(aggregate_evaluator->function()->create_serialize_column());
+
_value_data_types.emplace_back(aggregate_evaluator->function()->get_serialized_type());
+
_value_columns.emplace_back(aggregate_evaluator->function()->create_serialize_column());
}
_rows_in_partitions.assign(parent._partition_count, 0);
return Status::OK();
@@ -102,7 +102,7 @@ void PartitionedAggSinkLocalState::_init_counters() {
update_profile_from_inner_profile<spilled>(name, custom_profile(),
child_profile)
template <bool spilled>
-void PartitionedAggSinkLocalState::update_profile(RuntimeProfile*
child_profile) {
+void PartitionedAggSinkLocalState::_update_profile(RuntimeProfile*
child_profile) {
UPDATE_PROFILE("MemoryUsageHashTable");
UPDATE_PROFILE("MemoryUsageSerializeKeyArena");
UPDATE_PROFILE("BuildTime");
@@ -159,7 +159,7 @@ Status
PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized:
}
} else {
auto* sink_local_state =
local_state._runtime_state->get_sink_local_state();
- local_state.update_profile<false>(sink_local_state->custom_profile());
+ local_state._update_profile<false>(sink_local_state->custom_profile());
}
// finally perform EOS bookkeeping
@@ -190,7 +190,7 @@ Status
PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized:
Status PartitionedAggSinkOperatorX::revoke_memory(RuntimeState* state) {
auto& local_state = get_local_state(state);
- return local_state.revoke_memory(state);
+ return local_state._revoke_memory(state);
}
size_t PartitionedAggSinkOperatorX::revocable_mem_size(RuntimeState* state)
const {
@@ -204,7 +204,7 @@ size_t
PartitionedAggSinkOperatorX::revocable_mem_size(RuntimeState* state) cons
return size > state->spill_min_revocable_mem() ? size : 0;
}
-Status PartitionedAggSinkLocalState::setup_in_memory_agg_op(RuntimeState*
state) {
+Status PartitionedAggSinkLocalState::_setup_in_memory_agg_op(RuntimeState*
state) {
_runtime_state = RuntimeState::create_unique(
state->fragment_instance_id(), state->query_id(),
state->fragment_id(),
state->query_options(), TQueryGlobals {}, state->exec_env(),
state->get_query_ctx());
@@ -244,18 +244,19 @@ size_t
PartitionedAggSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bo
}
template <typename HashTableCtxType, typename KeyType>
-Status PartitionedAggSinkLocalState::to_block(HashTableCtxType& context,
std::vector<KeyType>& keys,
-
std::vector<vectorized::AggregateDataPtr>& values,
- const
vectorized::AggregateDataPtr null_key_data) {
+Status PartitionedAggSinkLocalState::_to_block(HashTableCtxType& context,
+ std::vector<KeyType>& keys,
+
std::vector<vectorized::AggregateDataPtr>& values,
+ const
vectorized::AggregateDataPtr null_key_data) {
SCOPED_TIMER(_spill_serialize_hash_table_timer);
- context.insert_keys_into_columns(keys, key_columns_,
(uint32_t)keys.size());
+ context.insert_keys_into_columns(keys, _key_columns,
(uint32_t)keys.size());
if (null_key_data) {
// only one key of group by support wrap null key
// here need additional processing logic on the null key / value
- CHECK(key_columns_.size() == 1);
- CHECK(key_columns_[0]->is_nullable());
- key_columns_[0]->insert_data(nullptr, 0);
+ CHECK(_key_columns.size() == 1);
+ CHECK(_key_columns[0]->is_nullable());
+ _key_columns[0]->insert_data(nullptr, 0);
values.emplace_back(null_key_data);
}
@@ -267,33 +268,33 @@ Status
PartitionedAggSinkLocalState::to_block(HashTableCtxType& context, std::ve
->serialize_to_column(
values,
Base::_shared_state->_in_mem_shared_state->offsets_of_aggregate_states[i],
- value_columns_[i], values.size());
+ _value_columns[i], values.size());
}
vectorized::ColumnsWithTypeAndName key_columns_with_schema;
- for (int i = 0; i < key_columns_.size(); ++i) {
+ for (int i = 0; i < _key_columns.size(); ++i) {
key_columns_with_schema.emplace_back(
- std::move(key_columns_[i]),
+ std::move(_key_columns[i]),
Base::_shared_state->_in_mem_shared_state->probe_expr_ctxs[i]->root()->data_type(),
Base::_shared_state->_in_mem_shared_state->probe_expr_ctxs[i]->root()->expr_name());
}
- key_block_ = key_columns_with_schema;
+ _key_block = key_columns_with_schema;
vectorized::ColumnsWithTypeAndName value_columns_with_schema;
- for (int i = 0; i < value_columns_.size(); ++i) {
+ for (int i = 0; i < _value_columns.size(); ++i) {
value_columns_with_schema.emplace_back(
- std::move(value_columns_[i]), value_data_types_[i],
+ std::move(_value_columns[i]), _value_data_types[i],
Base::_shared_state->_in_mem_shared_state->aggregate_evaluators[i]
->function()
->get_name());
}
- value_block_ = value_columns_with_schema;
+ _value_block = value_columns_with_schema;
- for (const auto& column : key_block_.get_columns_with_type_and_name()) {
- block_.insert(column);
+ for (const auto& column : _key_block.get_columns_with_type_and_name()) {
+ _block.insert(column);
}
- for (const auto& column : value_block_.get_columns_with_type_and_name()) {
- block_.insert(column);
+ for (const auto& column : _value_block.get_columns_with_type_and_name()) {
+ _block.insert(column);
}
return Status::OK();
}
@@ -303,7 +304,7 @@ Status PartitionedAggSinkLocalState::_spill_partition(
RuntimeState* state, HashTableCtxType& context, size_t partition_idx,
std::vector<KeyType>& keys, std::vector<vectorized::AggregateDataPtr>&
values,
const vectorized::AggregateDataPtr null_key_data, bool is_last) {
- auto status = to_block(context, keys, values, null_key_data);
+ auto status = _to_block(context, keys, values, null_key_data);
RETURN_IF_ERROR(status);
if (is_last) {
@@ -335,7 +336,7 @@ Status PartitionedAggSinkLocalState::_spill_partition(
RETURN_IF_ERROR(spill_file->create_writer(state,
Base::operator_profile(), writer));
}
- RETURN_IF_ERROR(writer->write_block(state, block_));
+ RETURN_IF_ERROR(writer->write_block(state, _block));
_reset_tmp_data();
return Status::OK();
}
@@ -411,7 +412,7 @@ Status
PartitionedAggSinkLocalState::_spill_hash_table(RuntimeState* state,
return Status::OK();
}
-Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
+Status PartitionedAggSinkLocalState::_revoke_memory(RuntimeState* state) {
if (_eos) {
return Status::OK();
}
@@ -425,9 +426,9 @@ Status
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
if (!_shared_state->_is_spilled) {
_shared_state->_is_spilled = true;
custom_profile()->add_info_string("Spilled", "true");
- update_profile<false>(sink_local_state->custom_profile());
+ _update_profile<false>(sink_local_state->custom_profile());
} else {
- update_profile<false>(sink_local_state->custom_profile());
+ _update_profile<false>(sink_local_state->custom_profile());
}
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_submit_func",
{
@@ -493,39 +494,39 @@ Status
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
}
void PartitionedAggSinkLocalState::_reset_tmp_data() {
- block_.clear();
- key_columns_.clear();
- value_columns_.clear();
- key_block_.clear_column_data();
- value_block_.clear_column_data();
- key_columns_ = key_block_.mutate_columns();
- value_columns_ = value_block_.mutate_columns();
+ _block.clear();
+ _key_columns.clear();
+ _value_columns.clear();
+ _key_block.clear_column_data();
+ _value_block.clear_column_data();
+ _key_columns = _key_block.mutate_columns();
+ _value_columns = _value_block.mutate_columns();
}
void PartitionedAggSinkLocalState::_clear_tmp_data() {
{
vectorized::Block empty_block;
- block_.swap(empty_block);
+ _block.swap(empty_block);
}
{
vectorized::Block empty_block;
- key_block_.swap(empty_block);
+ _key_block.swap(empty_block);
}
{
vectorized::Block empty_block;
- value_block_.swap(empty_block);
+ _value_block.swap(empty_block);
}
{
vectorized::MutableColumns cols;
- key_columns_.swap(cols);
+ _key_columns.swap(cols);
}
{
vectorized::MutableColumns cols;
- value_columns_.swap(cols);
+ _value_columns.swap(cols);
}
vectorized::DataTypes tmp_value_data_types;
- value_data_types_.swap(tmp_value_data_types);
+ _value_data_types.swap(tmp_value_data_types);
}
bool PartitionedAggSinkLocalState::is_blockable() const {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 5538c296562..6b2da40f5ff 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -42,20 +42,21 @@ public:
PartitionedAggSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState*
state);
~PartitionedAggSinkLocalState() override = default;
- friend class PartitionedAggSinkOperatorX;
-
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
- Status revoke_memory(RuntimeState* state);
+ bool is_blockable() const override;
- Status setup_in_memory_agg_op(RuntimeState* state);
+private:
+ friend class PartitionedAggSinkOperatorX;
- template <bool spilled>
- void update_profile(RuntimeProfile* child_profile);
+ Status _revoke_memory(RuntimeState* state);
- bool is_blockable() const override;
+ Status _setup_in_memory_agg_op(RuntimeState* state);
+
+ template <bool spilled>
+ void _update_profile(RuntimeProfile* child_profile);
template <typename KeyType>
struct TmpSpillInfo {
@@ -74,9 +75,9 @@ public:
const vectorized::AggregateDataPtr null_key_data,
bool is_last);
template <typename HashTableCtxType, typename KeyType>
- Status to_block(HashTableCtxType& context, std::vector<KeyType>& keys,
- std::vector<vectorized::AggregateDataPtr>& values,
- const vectorized::AggregateDataPtr null_key_data);
+ Status _to_block(HashTableCtxType& context, std::vector<KeyType>& keys,
+ std::vector<vectorized::AggregateDataPtr>& values,
+ const vectorized::AggregateDataPtr null_key_data);
void _reset_tmp_data();
void _clear_tmp_data();
@@ -85,12 +86,12 @@ public:
std::unique_ptr<RuntimeState> _runtime_state;
// temp structures during spilling
- vectorized::MutableColumns key_columns_;
- vectorized::MutableColumns value_columns_;
- vectorized::DataTypes value_data_types_;
- vectorized::Block block_;
- vectorized::Block key_block_;
- vectorized::Block value_block_;
+ vectorized::MutableColumns _key_columns;
+ vectorized::MutableColumns _value_columns;
+ vectorized::DataTypes _value_data_types;
+ vectorized::Block _block;
+ vectorized::Block _key_block;
+ vectorized::Block _value_block;
std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
RuntimeProfile::Counter* _memory_usage_reserved = nullptr;
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index fe2ebb2df78..9f87efa490e 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -67,7 +67,7 @@ Status PartitionedAggLocalState::open(RuntimeState* state) {
return Status::OK();
}
_opened = true;
- RETURN_IF_ERROR(setup_in_memory_agg_op(state));
+ RETURN_IF_ERROR(_setup_in_memory_agg_op(state));
return Status::OK();
}
@@ -76,7 +76,7 @@ Status PartitionedAggLocalState::open(RuntimeState* state) {
update_profile_from_inner_profile<spilled>(name, custom_profile(),
child_profile)
template <bool spilled>
-void PartitionedAggLocalState::update_profile(RuntimeProfile* child_profile) {
+void PartitionedAggLocalState::_update_profile(RuntimeProfile* child_profile) {
UPDATE_COUNTER_FROM_INNER("GetResultsTime");
UPDATE_COUNTER_FROM_INNER("HashTableIterateTime");
UPDATE_COUNTER_FROM_INNER("InsertKeysToColumnTime");
@@ -219,7 +219,7 @@ Status
PartitionedAggSourceOperatorX::revoke_memory(RuntimeState* state) {
PrettyPrinter::print_bytes(local_state._estimate_memory_usage));
// Flush hash table + repartition remaining spill files of the current
partition.
- RETURN_IF_ERROR(local_state.flush_and_repartition(state));
+ RETURN_IF_ERROR(local_state._flush_and_repartition(state));
local_state._current_partition = AggSpillPartitionInfo {};
local_state._need_to_setup_partition = true;
return Status::OK();
@@ -241,7 +241,7 @@ Status
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
if (*eos) {
auto* source_local_state =
runtime_state->get_local_state(_agg_source_operator->operator_id());
-
local_state.update_profile<false>(source_local_state->custom_profile());
+
local_state._update_profile<false>(source_local_state->custom_profile());
}
local_state.reached_limit(block, eos);
return Status::OK();
@@ -314,7 +314,7 @@ Status
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
if (inner_eos) {
auto* source_local_state =
runtime_state->get_local_state(_agg_source_operator->operator_id());
- local_state.update_profile<true>(source_local_state->custom_profile());
+
local_state._update_profile<true>(source_local_state->custom_profile());
// Current partition fully output. Reset hash table, pop next
partition.
RETURN_IF_ERROR(_agg_source_operator->reset_hash_table(runtime_state));
@@ -387,7 +387,7 @@ Status
PartitionedAggLocalState::_recover_blocks_from_partition(RuntimeState* st
return Status::OK();
}
-Status PartitionedAggLocalState::setup_in_memory_agg_op(RuntimeState* state) {
+Status PartitionedAggLocalState::_setup_in_memory_agg_op(RuntimeState* state) {
_runtime_state = RuntimeState::create_unique(
state->fragment_instance_id(), state->query_id(),
state->fragment_id(),
state->query_options(), TQueryGlobals {}, state->exec_env(),
state->get_query_ctx());
@@ -416,7 +416,7 @@ Status
PartitionedAggLocalState::setup_in_memory_agg_op(RuntimeState* state) {
return source_local_state->open(state);
}
-Status
PartitionedAggLocalState::flush_hash_table_to_sub_spill_files(RuntimeState*
state) {
+Status
PartitionedAggLocalState::_flush_hash_table_to_sub_spill_files(RuntimeState*
state) {
auto* runtime_state = _runtime_state.get();
auto& p = _parent->cast<PartitionedAggSourceOperatorX>();
auto* in_mem_state = _shared_state->_in_mem_shared_state;
@@ -439,7 +439,7 @@ Status
PartitionedAggLocalState::flush_hash_table_to_sub_spill_files(RuntimeStat
return Status::OK();
}
-Status PartitionedAggLocalState::flush_and_repartition(RuntimeState* state) {
+Status PartitionedAggLocalState::_flush_and_repartition(RuntimeState* state) {
auto& p = _parent->cast<PartitionedAggSourceOperatorX>();
const int new_level = _current_partition.level + 1;
@@ -459,7 +459,7 @@ Status
PartitionedAggLocalState::flush_and_repartition(RuntimeState* state) {
{
auto* source_local_state =
_runtime_state->get_local_state(p._agg_source_operator->operator_id());
- update_profile<true>(source_local_state->custom_profile());
+ _update_profile<true>(source_local_state->custom_profile());
}
// 1. Create FANOUT output sub-spill-files.
@@ -485,7 +485,7 @@ Status
PartitionedAggLocalState::flush_and_repartition(RuntimeState* state) {
RETURN_IF_ERROR(_repartitioner.setup_output(state, output_spill_files));
// 2. Flush the in-memory hash table into the sub-spill-files.
- RETURN_IF_ERROR(flush_hash_table_to_sub_spill_files(state));
+ RETURN_IF_ERROR(_flush_hash_table_to_sub_spill_files(state));
// 3. Route any in-memory blocks that were recovered but not yet merged
// into the hash table. These blocks were already read from the file
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index 518e59b5ac5..54f9aad3f8d 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -62,25 +62,25 @@ public:
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
- Status setup_in_memory_agg_op(RuntimeState* state);
+ bool is_blockable() const override;
- template <bool spilled>
- void update_profile(RuntimeProfile* child_profile);
+private:
+ friend class PartitionedAggSourceOperatorX;
- bool is_blockable() const override;
+ Status _setup_in_memory_agg_op(RuntimeState* state);
+
+ template <bool spilled>
+ void _update_profile(RuntimeProfile* child_profile);
/// Flush the current in-memory hash table by draining it as blocks and
routing
/// each block through the repartitioner into the output sub-spill-files.
- Status flush_hash_table_to_sub_spill_files(RuntimeState* state);
+ Status _flush_hash_table_to_sub_spill_files(RuntimeState* state);
/// Flush the in-memory hash table into FANOUT sub-spill-files,
repartition remaining
/// unread spill files from `remaining_spill_files`, and push resulting
sub-partitions into
/// `_partition_queue`. After this call the hash table is reset and
/// `remaining_spill_files` is cleared.
- Status flush_and_repartition(RuntimeState* state);
-
-private:
- friend class PartitionedAggSourceOperatorX;
+ Status _flush_and_repartition(RuntimeState* state);
/// Move all original spill_partitions from shared state into
`_partition_queue`.
/// Called once when spilled get_block is first entered.
@@ -106,7 +106,7 @@ private:
std::vector<vectorized::Block> _blocks;
// Estimated in-memory hash table size for the current partition.
- //size_t _estimate_memory_usage = 0;
+ size_t _estimate_memory_usage = 0;
// Counters to track spill partition metrics
RuntimeProfile::Counter* _max_partition_level = nullptr;
diff --git a/be/test/vec/spill/spill_file_test.cpp
b/be/test/vec/spill/spill_file_test.cpp
new file mode 100644
index 00000000000..2313ed53614
--- /dev/null
+++ b/be/test/vec/spill/spill_file_test.cpp
@@ -0,0 +1,1040 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/spill/spill_file.h"
+
+#include <gtest/gtest.h>
+
+#include <algorithm>
+#include <memory>
+#include <numeric>
+#include <vector>
+
+#include "common/config.h"
+#include "io/fs/local_file_system.h"
+#include "runtime/exec_env.h"
+#include "testutil/column_helper.h"
+#include "testutil/mock/mock_runtime_state.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
+#include "vec/data_types/data_type_number.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/spill/spill_file_manager.h"
+#include "vec/spill/spill_file_reader.h"
+#include "vec/spill/spill_file_writer.h"
+
+namespace doris::vectorized {
+
+class SpillFileTest : public testing::Test {
+protected:
+ void SetUp() override {
+ _runtime_state = std::make_unique<MockRuntimeState>();
+
+ _profile = std::make_unique<RuntimeProfile>("test");
+ _custom_profile = std::make_unique<RuntimeProfile>("CustomCounters");
+ _common_profile = std::make_unique<RuntimeProfile>("CommonCounters");
+
+ _common_profile->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES,
"", 1);
+ ADD_TIMER_WITH_LEVEL(_common_profile.get(), "ExecTime", 1);
+
+ ADD_TIMER_WITH_LEVEL(_custom_profile.get(), "SpillTotalTime", 1);
+ ADD_TIMER_WITH_LEVEL(_custom_profile.get(), "SpillWriteTime", 1);
+ ADD_COUNTER_WITH_LEVEL(_custom_profile.get(),
"SpillWriteTaskWaitInQueueCount", TUnit::UNIT,
+ 1);
+ ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteTaskCount",
TUnit::UNIT, 1);
+ ADD_TIMER_WITH_LEVEL(_custom_profile.get(),
"SpillWriteTaskWaitInQueueTime", 1);
+ ADD_TIMER_WITH_LEVEL(_custom_profile.get(), "SpillWriteFileTime", 1);
+ ADD_TIMER_WITH_LEVEL(_custom_profile.get(),
"SpillWriteSerializeBlockTime", 1);
+ ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteBlockCount",
TUnit::UNIT, 1);
+ ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteBlockBytes",
TUnit::BYTES, 1);
+ ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteFileBytes",
TUnit::BYTES, 1);
+ ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillWriteRows",
TUnit::UNIT, 1);
+ ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadFileTime",
TUnit::UNIT, 1);
+ ADD_COUNTER_WITH_LEVEL(_custom_profile.get(),
"SpillReadDerializeBlockTime", TUnit::UNIT,
+ 1);
+ ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadBlockCount",
TUnit::UNIT, 1);
+ ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadBlockBytes",
TUnit::UNIT, 1);
+ ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadFileBytes",
TUnit::UNIT, 1);
+ ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadRows",
TUnit::UNIT, 1);
+ ADD_COUNTER_WITH_LEVEL(_custom_profile.get(), "SpillReadFileCount",
TUnit::UNIT, 1);
+ ADD_COUNTER_WITH_LEVEL(_custom_profile.get(),
"SpillWriteFileTotalCount", TUnit::UNIT, 1);
+ ADD_COUNTER_WITH_LEVEL(_custom_profile.get(),
"SpillWriteFileCurrentCount", TUnit::UNIT, 1);
+ ADD_COUNTER_WITH_LEVEL(_custom_profile.get(),
"SpillWriteFileCurrentBytes", TUnit::UNIT, 1);
+
+ _profile->add_child(_custom_profile.get(), true);
+ _profile->add_child(_common_profile.get(), true);
+
+ _spill_dir = "./ut_dir/spill_file_test";
+ auto spill_data_dir = std::make_unique<SpillDataDir>(_spill_dir, 1024L
* 1024 * 128);
+ auto st =
io::global_local_filesystem()->create_directory(spill_data_dir->path(), false);
+ ASSERT_TRUE(st.ok()) << "create directory failed: " << st.to_string();
+
+ std::unordered_map<std::string, std::unique_ptr<SpillDataDir>>
data_map;
+ _data_dir_ptr = spill_data_dir.get();
+ data_map.emplace("test", std::move(spill_data_dir));
+ auto* spill_file_manager = new SpillFileManager(std::move(data_map));
+ ExecEnv::GetInstance()->_spill_file_mgr = spill_file_manager;
+ st = spill_file_manager->init();
+ ASSERT_TRUE(st.ok()) << "init spill file manager failed: " <<
st.to_string();
+ }
+
+ void TearDown() override {
+ ExecEnv::GetInstance()->spill_file_mgr()->stop();
+ SAFE_DELETE(ExecEnv::GetInstance()->_spill_file_mgr);
+ // Clean up test directory
+ auto st = io::global_local_filesystem()->delete_directory(_spill_dir);
+ (void)st;
+ _runtime_state.reset();
+ }
+
+ Block _create_int_block(const std::vector<int32_t>& data) {
+ return ColumnHelper::create_block<DataTypeInt32>(data);
+ }
+
+ Block _create_two_column_block(const std::vector<int32_t>& col1,
+ const std::vector<int64_t>& col2) {
+ auto block = ColumnHelper::create_block<DataTypeInt32>(col1);
+
block.insert(ColumnHelper::create_column_with_name<DataTypeInt64>(col2));
+ return block;
+ }
+
+ std::unique_ptr<MockRuntimeState> _runtime_state;
+ std::unique_ptr<RuntimeProfile> _profile;
+ std::unique_ptr<RuntimeProfile> _custom_profile;
+ std::unique_ptr<RuntimeProfile> _common_profile;
+ std::string _spill_dir;
+ SpillDataDir* _data_dir_ptr = nullptr;
+};
+
+// ═══════════════════════════════════════════════════════════════════════
+// SpillFile basic tests
+// ═══════════════════════════════════════════════════════════════════════
+
+TEST_F(SpillFileTest, CreateSpillFile) {
+ SpillFileSPtr spill_file;
+ auto st =
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/test_file",
+
spill_file);
+ ASSERT_TRUE(st.ok()) << st.to_string();
+ ASSERT_TRUE(spill_file != nullptr);
+ ASSERT_FALSE(spill_file->ready_for_reading());
+}
+
+TEST_F(SpillFileTest, CreateWriterAndReader) {
+ SpillFileSPtr spill_file;
+ auto st =
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/create_wr",
+
spill_file);
+ ASSERT_TRUE(st.ok()) << st.to_string();
+
+ // Create writer
+ SpillFileWriterSPtr writer;
+ st = spill_file->create_writer(_runtime_state.get(), _profile.get(),
writer);
+ ASSERT_TRUE(st.ok()) << st.to_string();
+ ASSERT_TRUE(writer != nullptr);
+
+ // Close writer with no data written
+ st = writer->close();
+ ASSERT_TRUE(st.ok()) << st.to_string();
+ ASSERT_TRUE(spill_file->ready_for_reading());
+
+ // Create reader on empty file (0 parts)
+ auto reader = spill_file->create_reader(_runtime_state.get(),
_profile.get());
+ ASSERT_TRUE(reader != nullptr);
+
+ st = reader->open();
+ ASSERT_TRUE(st.ok()) << st.to_string();
+
+ Block block;
+ bool eos = false;
+ st = reader->read(&block, &eos);
+ ASSERT_TRUE(st.ok()) << st.to_string();
+ ASSERT_TRUE(eos);
+
+ st = reader->close();
+ ASSERT_TRUE(st.ok()) << st.to_string();
+}
+
+// ═══════════════════════════════════════════════════════════════════════
+// SpillFileWriter tests
+// ═══════════════════════════════════════════════════════════════════════
+
+TEST_F(SpillFileTest, WriteSingleBlock) {
+ SpillFileSPtr spill_file;
+ auto st =
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/single_block",
+
spill_file);
+ ASSERT_TRUE(st.ok());
+
+ SpillFileWriterSPtr writer;
+ st = spill_file->create_writer(_runtime_state.get(), _profile.get(),
writer);
+ ASSERT_TRUE(st.ok());
+
+ auto block = _create_int_block({1, 2, 3, 4, 5});
+ st = writer->write_block(_runtime_state.get(), block);
+ ASSERT_TRUE(st.ok()) << st.to_string();
+
+ st = writer->close();
+ ASSERT_TRUE(st.ok()) << st.to_string();
+
+ ASSERT_TRUE(spill_file->ready_for_reading());
+
+ auto* write_rows_counter = _custom_profile->get_counter("SpillWriteRows");
+ ASSERT_TRUE(write_rows_counter != nullptr);
+ ASSERT_EQ(write_rows_counter->value(), 5);
+}
+
+TEST_F(SpillFileTest, WriteMultipleBlocks) {
+ SpillFileSPtr spill_file;
+ auto st =
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/multi_blocks",
+
spill_file);
+ ASSERT_TRUE(st.ok());
+
+ SpillFileWriterSPtr writer;
+ st = spill_file->create_writer(_runtime_state.get(), _profile.get(),
writer);
+ ASSERT_TRUE(st.ok());
+
+ for (int i = 0; i < 5; ++i) {
+ auto block = _create_int_block({i * 10, i * 10 + 1, i * 10 + 2});
+ st = writer->write_block(_runtime_state.get(), block);
+ ASSERT_TRUE(st.ok()) << "write block " << i << " failed: " <<
st.to_string();
+ }
+
+ st = writer->close();
+ ASSERT_TRUE(st.ok()) << st.to_string();
+
+ auto* write_rows_counter = _custom_profile->get_counter("SpillWriteRows");
+ ASSERT_EQ(write_rows_counter->value(), 15);
+
+ auto* write_block_counter =
_custom_profile->get_counter("SpillWriteBlockCount");
+ ASSERT_EQ(write_block_counter->value(), 5);
+}
+
+TEST_F(SpillFileTest, WriteTwoColumnBlock) {
+ SpillFileSPtr spill_file;
+ auto st =
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/two_col",
+
spill_file);
+ ASSERT_TRUE(st.ok());
+
+ SpillFileWriterSPtr writer;
+ st = spill_file->create_writer(_runtime_state.get(), _profile.get(),
writer);
+ ASSERT_TRUE(st.ok());
+
+ auto block = _create_two_column_block({1, 2, 3}, {100, 200, 300});
+ st = writer->write_block(_runtime_state.get(), block);
+ ASSERT_TRUE(st.ok()) << st.to_string();
+
+ st = writer->close();
+ ASSERT_TRUE(st.ok()) << st.to_string();
+ ASSERT_TRUE(spill_file->ready_for_reading());
+}
+
+TEST_F(SpillFileTest, WriteEmptyBlock) {
+ SpillFileSPtr spill_file;
+ auto st =
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/empty_block",
+
spill_file);
+ ASSERT_TRUE(st.ok());
+
+ SpillFileWriterSPtr writer;
+ st = spill_file->create_writer(_runtime_state.get(), _profile.get(),
writer);
+ ASSERT_TRUE(st.ok());
+
+ Block empty_block;
+ st = writer->write_block(_runtime_state.get(), empty_block);
+ ASSERT_TRUE(st.ok()) << st.to_string();
+
+ st = writer->close();
+ ASSERT_TRUE(st.ok()) << st.to_string();
+}
+
+TEST_F(SpillFileTest, DoubleCloseWriter) {
+ SpillFileSPtr spill_file;
+ auto st =
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/double_close",
+
spill_file);
+ ASSERT_TRUE(st.ok());
+
+ SpillFileWriterSPtr writer;
+ st = spill_file->create_writer(_runtime_state.get(), _profile.get(),
writer);
+ ASSERT_TRUE(st.ok());
+
+ auto block = _create_int_block({1, 2, 3});
+ st = writer->write_block(_runtime_state.get(), block);
+ ASSERT_TRUE(st.ok());
+
+ st = writer->close();
+ ASSERT_TRUE(st.ok());
+
+ // Double close should be a no-op
+ st = writer->close();
+ ASSERT_TRUE(st.ok());
+}
+
+// ═══════════════════════════════════════════════════════════════════════
+// SpillFileReader tests
+// ═══════════════════════════════════════════════════════════════════════
+
+TEST_F(SpillFileTest, ReadSingleBlock) {
+ SpillFileSPtr spill_file;
+ auto st =
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/read_single",
+
spill_file);
+ ASSERT_TRUE(st.ok());
+
+ // Write
+ {
+ SpillFileWriterSPtr writer;
+ st = spill_file->create_writer(_runtime_state.get(), _profile.get(),
writer);
+ ASSERT_TRUE(st.ok());
+
+ auto block = _create_int_block({10, 20, 30, 40, 50});
+ st = writer->write_block(_runtime_state.get(), block);
+ ASSERT_TRUE(st.ok());
+
+ st = writer->close();
+ ASSERT_TRUE(st.ok());
+ }
+
+ // Read
+ auto reader = spill_file->create_reader(_runtime_state.get(),
_profile.get());
+ st = reader->open();
+ ASSERT_TRUE(st.ok()) << st.to_string();
+
+ Block block;
+ bool eos = false;
+ st = reader->read(&block, &eos);
+ ASSERT_TRUE(st.ok()) << st.to_string();
+ ASSERT_FALSE(eos);
+ ASSERT_EQ(block.rows(), 5);
+
+ // Verify data
+ auto col = block.get_by_position(0).column;
+ ASSERT_EQ(col->get_int(0), 10);
+ ASSERT_EQ(col->get_int(1), 20);
+ ASSERT_EQ(col->get_int(2), 30);
+ ASSERT_EQ(col->get_int(3), 40);
+ ASSERT_EQ(col->get_int(4), 50);
+
+ // Next read should be EOS
+ Block block2;
+ st = reader->read(&block2, &eos);
+ ASSERT_TRUE(st.ok());
+ ASSERT_TRUE(eos);
+
+ st = reader->close();
+ ASSERT_TRUE(st.ok());
+}
+
+TEST_F(SpillFileTest, ReadMultipleBlocks) {
+ SpillFileSPtr spill_file;
+ auto st =
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/read_multi",
+
spill_file);
+ ASSERT_TRUE(st.ok());
+
+ const int num_blocks = 10;
+ const int rows_per_block = 100;
+
+ // Write
+ {
+ SpillFileWriterSPtr writer;
+ st = spill_file->create_writer(_runtime_state.get(), _profile.get(),
writer);
+ ASSERT_TRUE(st.ok());
+
+ for (int b = 0; b < num_blocks; ++b) {
+ std::vector<int32_t> data(rows_per_block);
+ std::iota(data.begin(), data.end(), b * rows_per_block);
+ auto block = _create_int_block(data);
+ st = writer->write_block(_runtime_state.get(), block);
+ ASSERT_TRUE(st.ok());
+ }
+
+ st = writer->close();
+ ASSERT_TRUE(st.ok());
+ }
+
+ // Read all blocks
+ auto reader = spill_file->create_reader(_runtime_state.get(),
_profile.get());
+ st = reader->open();
+ ASSERT_TRUE(st.ok());
+
+ size_t total_rows = 0;
+ int block_count = 0;
+ bool eos = false;
+ while (!eos) {
+ Block block;
+ st = reader->read(&block, &eos);
+ ASSERT_TRUE(st.ok()) << st.to_string();
+ if (!eos) {
+ total_rows += block.rows();
+ ++block_count;
+ }
+ }
+
+ ASSERT_EQ(total_rows, num_blocks * rows_per_block);
+ ASSERT_EQ(block_count, num_blocks);
+
+ st = reader->close();
+ ASSERT_TRUE(st.ok());
+}
+
+TEST_F(SpillFileTest, ReadTwoColumnBlock) {
+ SpillFileSPtr spill_file;
+ auto st =
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/read_two_col",
+
spill_file);
+ ASSERT_TRUE(st.ok());
+
+ // Write
+ {
+ SpillFileWriterSPtr writer;
+ st = spill_file->create_writer(_runtime_state.get(), _profile.get(),
writer);
+ ASSERT_TRUE(st.ok());
+
+ auto block = _create_two_column_block({1, 2, 3, 4}, {100, 200, 300,
400});
+ st = writer->write_block(_runtime_state.get(), block);
+ ASSERT_TRUE(st.ok());
+
+ st = writer->close();
+ ASSERT_TRUE(st.ok());
+ }
+
+ // Read
+ auto reader = spill_file->create_reader(_runtime_state.get(),
_profile.get());
+ st = reader->open();
+ ASSERT_TRUE(st.ok());
+
+ Block block;
+ bool eos = false;
+ st = reader->read(&block, &eos);
+ ASSERT_TRUE(st.ok());
+ ASSERT_FALSE(eos);
+ ASSERT_EQ(block.rows(), 4);
+ ASSERT_EQ(block.columns(), 2);
+
+ // Verify col1
+ auto col1 = block.get_by_position(0).column;
+ ASSERT_EQ(col1->get_int(0), 1);
+ ASSERT_EQ(col1->get_int(3), 4);
+
+ // Verify col2
+ auto col2 = block.get_by_position(1).column;
+ ASSERT_EQ(col2->get_int(0), 100);
+ ASSERT_EQ(col2->get_int(3), 400);
+
+ st = reader->close();
+ ASSERT_TRUE(st.ok());
+}
+
+// ═══════════════════════════════════════════════════════════════════════
+// Roundtrip tests (write -> read -> verify)
+// ═══════════════════════════════════════════════════════════════════════
+
+TEST_F(SpillFileTest, RoundtripSingleBlock) {
+ SpillFileSPtr spill_file;
+ auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(
+ "test_query/roundtrip_single", spill_file);
+ ASSERT_TRUE(st.ok());
+
+ std::vector<int32_t> original_data = {42, 7, 99, 1, 0, -5, 1000};
+
+ // Write
+ {
+ SpillFileWriterSPtr writer;
+ st = spill_file->create_writer(_runtime_state.get(), _profile.get(),
writer);
+ ASSERT_TRUE(st.ok());
+
+ auto block = _create_int_block(original_data);
+ st = writer->write_block(_runtime_state.get(), block);
+ ASSERT_TRUE(st.ok());
+
+ st = writer->close();
+ ASSERT_TRUE(st.ok());
+ }
+
+ // Read & verify
+ auto reader = spill_file->create_reader(_runtime_state.get(),
_profile.get());
+ st = reader->open();
+ ASSERT_TRUE(st.ok());
+
+ Block block;
+ bool eos = false;
+ st = reader->read(&block, &eos);
+ ASSERT_TRUE(st.ok());
+ ASSERT_EQ(block.rows(), original_data.size());
+
+ auto col = block.get_by_position(0).column;
+ for (size_t i = 0; i < original_data.size(); ++i) {
+ ASSERT_EQ(col->get_int(i), original_data[i]) << "mismatch at index "
<< i;
+ }
+
+ st = reader->close();
+ ASSERT_TRUE(st.ok());
+}
+
+TEST_F(SpillFileTest, RoundtripMultipleBlocks) {
+ SpillFileSPtr spill_file;
+ auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(
+ "test_query/roundtrip_multi", spill_file);
+ ASSERT_TRUE(st.ok());
+
+ std::vector<std::vector<int32_t>> all_data = {
+ {1, 2, 3},
+ {10, 20, 30, 40},
+ {100, 200},
+ {-1, -2, -3, -4, -5},
+ };
+
+ // Write
+ {
+ SpillFileWriterSPtr writer;
+ st = spill_file->create_writer(_runtime_state.get(), _profile.get(),
writer);
+ ASSERT_TRUE(st.ok());
+
+ for (const auto& data : all_data) {
+ auto block = _create_int_block(data);
+ st = writer->write_block(_runtime_state.get(), block);
+ ASSERT_TRUE(st.ok());
+ }
+
+ st = writer->close();
+ ASSERT_TRUE(st.ok());
+ }
+
+ // Read & verify
+ auto reader = spill_file->create_reader(_runtime_state.get(),
_profile.get());
+ st = reader->open();
+ ASSERT_TRUE(st.ok());
+
+ size_t block_idx = 0;
+ bool eos = false;
+ while (!eos && block_idx < all_data.size()) {
+ Block block;
+ st = reader->read(&block, &eos);
+ ASSERT_TRUE(st.ok());
+ if (eos) break;
+
+ ASSERT_EQ(block.rows(), all_data[block_idx].size())
+ << "block " << block_idx << " row count mismatch";
+
+ auto col = block.get_by_position(0).column;
+ for (size_t i = 0; i < all_data[block_idx].size(); ++i) {
+ ASSERT_EQ(col->get_int(i), all_data[block_idx][i])
+ << "mismatch at block " << block_idx << " row " << i;
+ }
+ ++block_idx;
+ }
+
+ ASSERT_EQ(block_idx, all_data.size());
+
+ st = reader->close();
+ ASSERT_TRUE(st.ok());
+}
+
+TEST_F(SpillFileTest, RoundtripLargeData) {
+ SpillFileSPtr spill_file;
+ auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(
+ "test_query/roundtrip_large", spill_file);
+ ASSERT_TRUE(st.ok());
+
+ const size_t row_count = 100000;
+ std::vector<int32_t> data(row_count);
+ std::iota(data.begin(), data.end(), 0);
+
+ // Write
+ {
+ SpillFileWriterSPtr writer;
+ st = spill_file->create_writer(_runtime_state.get(), _profile.get(),
writer);
+ ASSERT_TRUE(st.ok());
+
+ auto block = _create_int_block(data);
+ st = writer->write_block(_runtime_state.get(), block);
+ ASSERT_TRUE(st.ok());
+
+ st = writer->close();
+ ASSERT_TRUE(st.ok());
+ }
+
+ // Read & verify
+ auto reader = spill_file->create_reader(_runtime_state.get(),
_profile.get());
+ st = reader->open();
+ ASSERT_TRUE(st.ok());
+
+ Block block;
+ bool eos = false;
+ st = reader->read(&block, &eos);
+ ASSERT_TRUE(st.ok());
+ ASSERT_EQ(block.rows(), row_count);
+
+ auto col = block.get_by_position(0).column;
+ for (size_t i = 0; i < row_count; i += 1000) {
+ ASSERT_EQ(col->get_int(i), (int32_t)i) << "mismatch at index " << i;
+ }
+
+ st = reader->close();
+ ASSERT_TRUE(st.ok());
+}
+
+// ═══════════════════════════════════════════════════════════════════════
+// Part rotation tests
+// ═══════════════════════════════════════════════════════════════════════
+
+TEST_F(SpillFileTest, PartRotation) {
+ // Set a very small part size to force rotation
+ auto saved_part_size = config::spill_file_part_size_bytes;
+ config::spill_file_part_size_bytes = 1024; // 1KB per part
+
+ SpillFileSPtr spill_file;
+ auto st =
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/rotation",
+
spill_file);
+ ASSERT_TRUE(st.ok());
+
+ const int num_blocks = 20;
+
+ // Write many blocks to trigger multiple part rotations
+ {
+ SpillFileWriterSPtr writer;
+ st = spill_file->create_writer(_runtime_state.get(), _profile.get(),
writer);
+ ASSERT_TRUE(st.ok());
+
+ for (int i = 0; i < num_blocks; ++i) {
+ std::vector<int32_t> data(100);
+ std::iota(data.begin(), data.end(), i * 100);
+ auto block = _create_int_block(data);
+ st = writer->write_block(_runtime_state.get(), block);
+ ASSERT_TRUE(st.ok());
+ }
+
+ st = writer->close();
+ ASSERT_TRUE(st.ok());
+ }
+
+ // Read back and verify all data across multiple parts
+ auto reader = spill_file->create_reader(_runtime_state.get(),
_profile.get());
+ st = reader->open();
+ ASSERT_TRUE(st.ok());
+
+ size_t total_rows = 0;
+ int block_count = 0;
+ bool eos = false;
+ while (!eos) {
+ Block block;
+ st = reader->read(&block, &eos);
+ ASSERT_TRUE(st.ok());
+ if (!eos) {
+ total_rows += block.rows();
+ ++block_count;
+ }
+ }
+
+ ASSERT_EQ(total_rows, num_blocks * 100);
+ ASSERT_EQ(block_count, num_blocks);
+
+ st = reader->close();
+ ASSERT_TRUE(st.ok());
+
+ config::spill_file_part_size_bytes = saved_part_size;
+}
+
+TEST_F(SpillFileTest, PartRotationDataIntegrity) {
+ // Set a small part size to force rotation
+ auto saved_part_size = config::spill_file_part_size_bytes;
+ config::spill_file_part_size_bytes = 512;
+
+ SpillFileSPtr spill_file;
+ auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(
+ "test_query/rotation_integrity", spill_file);
+ ASSERT_TRUE(st.ok());
+
+ std::vector<std::vector<int32_t>> all_data;
+ const int num_blocks = 30;
+
+ // Write
+ {
+ SpillFileWriterSPtr writer;
+ st = spill_file->create_writer(_runtime_state.get(), _profile.get(),
writer);
+ ASSERT_TRUE(st.ok());
+
+ for (int i = 0; i < num_blocks; ++i) {
+ std::vector<int32_t> data(50);
+ std::iota(data.begin(), data.end(), i * 1000);
+ all_data.push_back(data);
+ auto block = _create_int_block(data);
+ st = writer->write_block(_runtime_state.get(), block);
+ ASSERT_TRUE(st.ok());
+ }
+
+ st = writer->close();
+ ASSERT_TRUE(st.ok());
+ }
+
+ // Read & verify data integrity across parts
+ auto reader = spill_file->create_reader(_runtime_state.get(),
_profile.get());
+ st = reader->open();
+ ASSERT_TRUE(st.ok());
+
+ size_t block_idx = 0;
+ bool eos = false;
+ while (!eos) {
+ Block block;
+ st = reader->read(&block, &eos);
+ ASSERT_TRUE(st.ok());
+ if (eos) break;
+
+ ASSERT_LT(block_idx, all_data.size());
+ ASSERT_EQ(block.rows(), all_data[block_idx].size());
+
+ auto col = block.get_by_position(0).column;
+ for (size_t i = 0; i < all_data[block_idx].size(); ++i) {
+ ASSERT_EQ(col->get_int(i), all_data[block_idx][i])
+ << "data mismatch at block " << block_idx << " row " << i;
+ }
+ ++block_idx;
+ }
+
+ ASSERT_EQ(block_idx, all_data.size());
+
+ st = reader->close();
+ ASSERT_TRUE(st.ok());
+
+ config::spill_file_part_size_bytes = saved_part_size;
+}
+
+// ═══════════════════════════════════════════════════════════════════════
+// Seek tests
+// ═══════════════════════════════════════════════════════════════════════
+
+TEST_F(SpillFileTest, SeekToBlock) {
+ SpillFileSPtr spill_file;
+ auto st =
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/seek",
+
spill_file);
+ ASSERT_TRUE(st.ok());
+
+ const int num_blocks = 5;
+
+ // Write
+ {
+ SpillFileWriterSPtr writer;
+ st = spill_file->create_writer(_runtime_state.get(), _profile.get(),
writer);
+ ASSERT_TRUE(st.ok());
+
+ for (int i = 0; i < num_blocks; ++i) {
+ auto block = _create_int_block({i * 10, i * 10 + 1, i * 10 + 2});
+ st = writer->write_block(_runtime_state.get(), block);
+ ASSERT_TRUE(st.ok());
+ }
+
+ st = writer->close();
+ ASSERT_TRUE(st.ok());
+ }
+
+ // Seek to block 2 (0-based) and read
+ auto reader = spill_file->create_reader(_runtime_state.get(),
_profile.get());
+ st = reader->open();
+ ASSERT_TRUE(st.ok());
+
+ reader->seek(2);
+
+ Block block;
+ bool eos = false;
+ st = reader->read(&block, &eos);
+ ASSERT_TRUE(st.ok());
+ ASSERT_FALSE(eos);
+ ASSERT_EQ(block.rows(), 3);
+
+ auto col = block.get_by_position(0).column;
+ ASSERT_EQ(col->get_int(0), 20);
+ ASSERT_EQ(col->get_int(1), 21);
+ ASSERT_EQ(col->get_int(2), 22);
+
+ st = reader->close();
+ ASSERT_TRUE(st.ok());
+}
+
+TEST_F(SpillFileTest, SeekBeyondEnd) {
+ SpillFileSPtr spill_file;
+ auto st =
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/seek_beyond",
+
spill_file);
+ ASSERT_TRUE(st.ok());
+
+ // Write 3 blocks
+ {
+ SpillFileWriterSPtr writer;
+ st = spill_file->create_writer(_runtime_state.get(), _profile.get(),
writer);
+ ASSERT_TRUE(st.ok());
+
+ for (int i = 0; i < 3; ++i) {
+ auto block = _create_int_block({i});
+ st = writer->write_block(_runtime_state.get(), block);
+ ASSERT_TRUE(st.ok());
+ }
+
+ st = writer->close();
+ ASSERT_TRUE(st.ok());
+ }
+
+ auto reader = spill_file->create_reader(_runtime_state.get(),
_profile.get());
+ st = reader->open();
+ ASSERT_TRUE(st.ok());
+
+ // Seek beyond the end
+ reader->seek(100);
+
+ Block block;
+ bool eos = false;
+ st = reader->read(&block, &eos);
+ ASSERT_TRUE(st.ok());
+ ASSERT_TRUE(eos);
+
+ st = reader->close();
+ ASSERT_TRUE(st.ok());
+}
+
+// ═══════════════════════════════════════════════════════════════════════
+// SpillFile GC/lifecycle tests
+// ═══════════════════════════════════════════════════════════════════════
+
+TEST_F(SpillFileTest, GCCleansUpFiles) {
+ std::string spill_file_dir;
+
+ {
+ SpillFileSPtr spill_file;
+ auto st =
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/gc_test",
+
spill_file);
+ ASSERT_TRUE(st.ok());
+
+ SpillFileWriterSPtr writer;
+ st = spill_file->create_writer(_runtime_state.get(), _profile.get(),
writer);
+ ASSERT_TRUE(st.ok());
+
+ auto block = _create_int_block({1, 2, 3});
+ st = writer->write_block(_runtime_state.get(), block);
+ ASSERT_TRUE(st.ok());
+
+ st = writer->close();
+ ASSERT_TRUE(st.ok());
+
+ // Remember the spill directory path
+ spill_file_dir = _data_dir_ptr->get_spill_data_path() +
"/test_query/gc_test";
+
+ // Verify directory exists
+ bool exists = false;
+ st = io::global_local_filesystem()->exists(spill_file_dir, &exists);
+ ASSERT_TRUE(st.ok());
+ ASSERT_TRUE(exists);
+
+ // spill_file goes out of scope here, destructor calls gc()
+ }
+
+ // After SpillFile is destroyed, the directory should be cleaned up
+ bool exists = false;
+ auto st = io::global_local_filesystem()->exists(spill_file_dir, &exists);
+ ASSERT_TRUE(st.ok());
+ ASSERT_FALSE(exists);
+}
+
+TEST_F(SpillFileTest, DeleteSpillFileThroughManager) {
+ SpillFileSPtr spill_file;
+ auto st =
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/mgr_delete",
+
spill_file);
+ ASSERT_TRUE(st.ok());
+
+ SpillFileWriterSPtr writer;
+ st = spill_file->create_writer(_runtime_state.get(), _profile.get(),
writer);
+ ASSERT_TRUE(st.ok());
+
+ auto block = _create_int_block({1, 2, 3});
+ st = writer->write_block(_runtime_state.get(), block);
+ ASSERT_TRUE(st.ok());
+
+ st = writer->close();
+ ASSERT_TRUE(st.ok());
+
+ // Delete through manager (async GC)
+ ExecEnv::GetInstance()->spill_file_mgr()->delete_spill_file(spill_file);
+
+ // Run GC to process the deletion
+ ExecEnv::GetInstance()->spill_file_mgr()->gc(1000);
+}
+
+// ═══════════════════════════════════════════════════════════════════════
+// SpillFileManager tests
+// ═══════════════════════════════════════════════════════════════════════
+
+TEST_F(SpillFileTest, ManagerNextId) {
+ auto id1 = ExecEnv::GetInstance()->spill_file_mgr()->next_id();
+ auto id2 = ExecEnv::GetInstance()->spill_file_mgr()->next_id();
+ auto id3 = ExecEnv::GetInstance()->spill_file_mgr()->next_id();
+
+ ASSERT_EQ(id2, id1 + 1);
+ ASSERT_EQ(id3, id2 + 1);
+}
+
+TEST_F(SpillFileTest, ManagerCreateMultipleFiles) {
+ const int num_files = 5;
+ std::vector<SpillFileSPtr> files;
+
+ for (int i = 0; i < num_files; ++i) {
+ SpillFileSPtr spill_file;
+ auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(
+ fmt::format("test_query/multi_{}", i), spill_file);
+ ASSERT_TRUE(st.ok()) << "create file " << i << " failed: " <<
st.to_string();
+ files.push_back(spill_file);
+ }
+
+ // Write and close each file
+ for (int i = 0; i < num_files; ++i) {
+ SpillFileWriterSPtr writer;
+ auto st = files[i]->create_writer(_runtime_state.get(),
_profile.get(), writer);
+ ASSERT_TRUE(st.ok());
+
+ auto block = _create_int_block({i * 100});
+ st = writer->write_block(_runtime_state.get(), block);
+ ASSERT_TRUE(st.ok());
+
+ st = writer->close();
+ ASSERT_TRUE(st.ok());
+ }
+
+ // Read each file and verify
+ for (int i = 0; i < num_files; ++i) {
+ auto reader = files[i]->create_reader(_runtime_state.get(),
_profile.get());
+ auto st = reader->open();
+ ASSERT_TRUE(st.ok());
+
+ Block block;
+ bool eos = false;
+ st = reader->read(&block, &eos);
+ ASSERT_TRUE(st.ok());
+ ASSERT_EQ(block.rows(), 1);
+
+ auto col = block.get_by_position(0).column;
+ ASSERT_EQ(col->get_int(0), i * 100);
+
+ st = reader->close();
+ ASSERT_TRUE(st.ok());
+ }
+}
+
+// ═══════════════════════════════════════════════════════════════════════
+// Profile counter tests
+// ═══════════════════════════════════════════════════════════════════════
+
+TEST_F(SpillFileTest, WriteCounters) {
+ SpillFileSPtr spill_file;
+ auto st =
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/counters",
+
spill_file);
+ ASSERT_TRUE(st.ok());
+
+ SpillFileWriterSPtr writer;
+ st = spill_file->create_writer(_runtime_state.get(), _profile.get(),
writer);
+ ASSERT_TRUE(st.ok());
+
+ auto block = _create_int_block({1, 2, 3, 4, 5});
+ st = writer->write_block(_runtime_state.get(), block);
+ ASSERT_TRUE(st.ok());
+
+ auto block2 = _create_int_block({10, 20, 30});
+ st = writer->write_block(_runtime_state.get(), block2);
+ ASSERT_TRUE(st.ok());
+
+ st = writer->close();
+ ASSERT_TRUE(st.ok());
+
+ auto* write_rows = _custom_profile->get_counter("SpillWriteRows");
+ ASSERT_TRUE(write_rows != nullptr);
+ ASSERT_EQ(write_rows->value(), 8);
+
+ auto* write_blocks = _custom_profile->get_counter("SpillWriteBlockCount");
+ ASSERT_TRUE(write_blocks != nullptr);
+ ASSERT_EQ(write_blocks->value(), 2);
+
+ auto* write_bytes = _custom_profile->get_counter("SpillWriteFileBytes");
+ ASSERT_TRUE(write_bytes != nullptr);
+ ASSERT_GT(write_bytes->value(), 0);
+}
+
+TEST_F(SpillFileTest, ReadCounters) {
+ SpillFileSPtr spill_file;
+ auto st = ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file(
+ "test_query/read_counters", spill_file);
+ ASSERT_TRUE(st.ok());
+
+ // Write
+ {
+ SpillFileWriterSPtr writer;
+ st = spill_file->create_writer(_runtime_state.get(), _profile.get(),
writer);
+ ASSERT_TRUE(st.ok());
+
+ auto block = _create_int_block({1, 2, 3, 4, 5});
+ st = writer->write_block(_runtime_state.get(), block);
+ ASSERT_TRUE(st.ok());
+
+ st = writer->close();
+ ASSERT_TRUE(st.ok());
+ }
+
+ // Read
+ auto reader = spill_file->create_reader(_runtime_state.get(),
_profile.get());
+ st = reader->open();
+ ASSERT_TRUE(st.ok());
+
+ Block block;
+ bool eos = false;
+ st = reader->read(&block, &eos);
+ ASSERT_TRUE(st.ok());
+
+ st = reader->close();
+ ASSERT_TRUE(st.ok());
+
+ auto* read_blocks = _custom_profile->get_counter("SpillReadBlockCount");
+ ASSERT_TRUE(read_blocks != nullptr);
+ ASSERT_EQ(read_blocks->value(), 1);
+
+ auto* read_rows = _custom_profile->get_counter("SpillReadRows");
+ ASSERT_TRUE(read_rows != nullptr);
+ ASSERT_EQ(read_rows->value(), 5);
+
+ auto* read_file_size = _custom_profile->get_counter("SpillReadFileBytes");
+ ASSERT_TRUE(read_file_size != nullptr);
+ ASSERT_GT(read_file_size->value(), 0);
+}
+
+// ═══════════════════════════════════════════════════════════════════════
+// SpillDataDir tests
+// ═══════════════════════════════════════════════════════════════════════
+
+TEST_F(SpillFileTest, DataDirCapacityTracking) {
+ SpillFileSPtr spill_file;
+ auto st =
ExecEnv::GetInstance()->spill_file_mgr()->create_spill_file("test_query/capacity",
+
spill_file);
+ ASSERT_TRUE(st.ok());
+
+ auto initial_bytes = _data_dir_ptr->get_spill_data_bytes();
+
+ SpillFileWriterSPtr writer;
+ st = spill_file->create_writer(_runtime_state.get(), _profile.get(),
writer);
+ ASSERT_TRUE(st.ok());
+
+ // Write a block to increase usage
+ std::vector<int32_t> data(1000);
+ std::iota(data.begin(), data.end(), 0);
+ auto block = _create_int_block(data);
+ st = writer->write_block(_runtime_state.get(), block);
+ ASSERT_TRUE(st.ok());
+
+ st = writer->close();
+ ASSERT_TRUE(st.ok());
+
+ auto after_write_bytes = _data_dir_ptr->get_spill_data_bytes();
+ ASSERT_GT(after_write_bytes, initial_bytes);
+}
+
+} // namespace doris::vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]