This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9e7204484af [improvement](spill) improve config and fix spill bugs
(#33519)
9e7204484af is described below
commit 9e7204484afcf6dc06c1810c5559dc8e82597afa
Author: TengJianPing <[email protected]>
AuthorDate: Tue Apr 16 15:46:13 2024 +0800
[improvement](spill) improve config and fix spill bugs (#33519)
---
be/src/common/config.cpp | 6 +-
be/src/common/config.h | 11 +-
be/src/olap/storage_engine.h | 2 +-
.../exec/partitioned_aggregation_sink_operator.cpp | 9 +-
.../partitioned_aggregation_source_operator.cpp | 13 +-
.../exec/partitioned_hash_join_probe_operator.cpp | 24 ++--
.../exec/partitioned_hash_join_sink_operator.cpp | 6 +-
be/src/pipeline/exec/spill_sort_sink_operator.cpp | 21 ++--
.../pipeline/exec/spill_sort_source_operator.cpp | 13 +-
be/src/runtime/exec_env_init.cpp | 8 +-
.../workload_group/workload_group_manager.cpp | 10 +-
be/src/vec/spill/spill_stream_manager.cpp | 134 +++++++++++++++------
be/src/vec/spill/spill_stream_manager.h | 53 ++++----
be/src/vec/spill/spill_writer.cpp | 14 +--
.../java/org/apache/doris/qe/SessionVariable.java | 42 ++++++-
15 files changed, 247 insertions(+), 119 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 212233dcbae..13acfd49042 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1160,13 +1160,13 @@ DEFINE_mDouble(high_disk_avail_level_diff_usages,
"0.15");
DEFINE_Int32(partition_disk_index_lru_size, "10000");
// limit the storage space that query spill files can use
DEFINE_String(spill_storage_root_path, "${DORIS_HOME}/storage");
-DEFINE_mInt64(spill_storage_limit, "10737418240"); // 10G
-DEFINE_mInt32(spill_gc_interval_ms, "2000"); // 2s
+DEFINE_String(spill_storage_limit, "20%"); // 20%
+DEFINE_mInt32(spill_gc_interval_ms, "2000"); // 2s
+DEFINE_mInt32(spill_gc_file_count, "2000");
DEFINE_Int32(spill_io_thread_pool_per_disk_thread_num, "2");
DEFINE_Int32(spill_io_thread_pool_queue_size, "1024");
DEFINE_Int32(spill_async_task_thread_pool_thread_num, "2");
DEFINE_Int32(spill_async_task_thread_pool_queue_size, "1024");
-DEFINE_mInt32(spill_mem_warning_water_mark_multiplier, "2");
DEFINE_mBool(check_segment_when_build_rowset_meta, "false");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index b6b7e0c1d36..9f226e22be2 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1242,13 +1242,20 @@ DECLARE_mDouble(high_disk_avail_level_diff_usages);
// create tablet in partition random robin idx lru size, default 10000
DECLARE_Int32(partition_disk_index_lru_size);
DECLARE_String(spill_storage_root_path);
-DECLARE_mInt64(spill_storage_limit);
+// Spill storage limit specified as number of bytes
+// ('<int>[bB]?'), megabytes ('<float>[mM]'), gigabytes ('<float>[gG]'),
+// or percentage of capaity ('<int>%').
+// Defaults to bytes if no unit is given.
+// Must larger than 0.
+// If specified as percentage, the final limit value is:
+// disk_capacity_bytes * storage_flood_stage_usage_percent *
spill_storage_limit
+DECLARE_String(spill_storage_limit);
DECLARE_mInt32(spill_gc_interval_ms);
+DECLARE_mInt32(spill_gc_file_count);
DECLARE_Int32(spill_io_thread_pool_per_disk_thread_num);
DECLARE_Int32(spill_io_thread_pool_queue_size);
DECLARE_Int32(spill_async_task_thread_pool_thread_num);
DECLARE_Int32(spill_async_task_thread_pool_queue_size);
-DECLARE_mInt32(spill_mem_warning_water_mark_multiplier);
DECLARE_mBool(check_segment_when_build_rowset_meta);
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 6888af9ee17..362f899c81a 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -170,7 +170,7 @@ public:
// get all info of root_path
Status get_all_data_dir_info(std::vector<DataDirInfo>* data_dir_infos,
bool need_update);
- int64_t get_file_or_directory_size(const std::string& file_path);
+ static int64_t get_file_or_directory_size(const std::string& file_path);
// get root path for creating tablet. The returned vector of root path
should be round robin,
// for avoiding that all the tablet would be deployed one disk.
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 4ea531bade0..55a0650dc1f 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -249,14 +249,15 @@ Status
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
auto execution_context = state->get_task_execution_context();
_shared_state_holder = _shared_state->shared_from_this();
+ auto query_id = state->query_id();
MonotonicStopWatch submit_timer;
submit_timer.start();
status =
ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func(
- [this, &parent, state, execution_context, submit_timer] {
+ [this, &parent, state, query_id, execution_context, submit_timer] {
auto execution_context_lock = execution_context.lock();
if (!execution_context_lock) {
- LOG(INFO) << "query " << print_id(state->query_id())
+ LOG(INFO) << "query " << print_id(query_id)
<< " execution_context released, maybe query was
cancelled.";
return Status::Cancelled("Cancelled");
}
@@ -267,13 +268,13 @@ Status
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
if (!_shared_state->sink_status.ok() ||
state->is_cancelled()) {
if (!_shared_state->sink_status.ok()) {
LOG(WARNING)
- << "query " << print_id(state->query_id())
<< " agg node "
+ << "query " << print_id(query_id) << " agg
node "
<< Base::_parent->id()
<< " revoke_memory error: " <<
Base::_shared_state->sink_status;
}
_shared_state->close();
} else {
- VLOG_DEBUG << "query " << print_id(state->query_id())
<< " agg node "
+ VLOG_DEBUG << "query " << print_id(query_id) << " agg
node "
<< Base::_parent->id() << " revoke_memory
finish"
<< ", eos: " << _eos;
}
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index a2484cd6db4..a5753ef7654 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -205,16 +205,17 @@ Status
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
auto execution_context = state->get_task_execution_context();
_shared_state_holder = _shared_state->shared_from_this();
+ auto query_id = state->query_id();
MonotonicStopWatch submit_timer;
submit_timer.start();
RETURN_IF_ERROR(
ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func(
- [this, state, execution_context, submit_timer] {
+ [this, state, query_id, execution_context, submit_timer] {
auto execution_context_lock = execution_context.lock();
if (!execution_context_lock) {
- LOG(INFO) << "query " <<
print_id(state->query_id())
+ LOG(INFO) << "query " << print_id(query_id)
<< " execution_context released, maybe
query was cancelled.";
// FIXME: return status is meaningless?
return Status::Cancelled("Cancelled");
@@ -225,14 +226,14 @@ Status
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
Defer defer {[&]() {
if (!_status.ok() || state->is_cancelled()) {
if (!_status.ok()) {
- LOG(WARNING) << "query " <<
print_id(state->query_id())
- << " agg node " <<
_parent->node_id()
+ LOG(WARNING) << "query " <<
print_id(query_id) << " agg node "
+ << _parent->node_id()
<< " merge spilled agg data
error: " << _status;
}
_shared_state->close();
} else if
(_shared_state->spill_partitions.empty()) {
- VLOG_DEBUG << "query " <<
print_id(state->query_id())
- << " agg node " <<
_parent->node_id()
+ VLOG_DEBUG << "query " << print_id(query_id)
<< " agg node "
+ << _parent->node_id()
<< " merge spilled agg data finish";
}
Base::_shared_state->in_mem_shared_state->aggregate_data_container
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 0f837f8bbda..35750ee5a55 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -493,6 +493,7 @@ Status
PartitionedHashJoinProbeOperatorX::prepare(RuntimeState* state) {
// to avoid prepare _child_x twice
auto child_x = std::move(_child_x);
RETURN_IF_ERROR(JoinProbeOperatorX::prepare(state));
+ RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_ctxs, state,
*_intermediate_row_desc));
RETURN_IF_ERROR(_inner_probe_operator->set_child(child_x));
DCHECK(_build_side_child != nullptr);
_inner_probe_operator->set_build_side_child(_build_side_child);
@@ -648,6 +649,8 @@ Status
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
}
}
+ const auto partition_index = local_state._partition_cursor;
+ auto& probe_blocks = local_state._probe_blocks[partition_index];
if (local_state._need_to_setup_internal_operators) {
*eos = false;
bool has_data = false;
@@ -659,12 +662,13 @@ Status
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
}
RETURN_IF_ERROR(_setup_internal_operators(local_state, state));
local_state._need_to_setup_internal_operators = false;
+ auto& mutable_block = local_state._partitioned_blocks[partition_index];
+ if (mutable_block && !mutable_block->empty()) {
+ probe_blocks.emplace_back(mutable_block->to_block());
+ }
}
-
- auto partition_index = local_state._partition_cursor;
- bool in_mem_eos_;
+ bool in_mem_eos = false;
auto* runtime_state = local_state._runtime_state.get();
- auto& probe_blocks = local_state._probe_blocks[partition_index];
while (_inner_probe_operator->need_more_input_data(runtime_state)) {
if (probe_blocks.empty()) {
*eos = false;
@@ -682,14 +686,16 @@ Status
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
auto block = std::move(probe_blocks.back());
probe_blocks.pop_back();
- RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state, &block,
false));
+ if (!block.empty()) {
+ RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state, &block,
false));
+ }
}
RETURN_IF_ERROR(_inner_probe_operator->pull(local_state._runtime_state.get(),
output_block,
- &in_mem_eos_));
+ &in_mem_eos));
*eos = false;
- if (in_mem_eos_) {
+ if (in_mem_eos) {
local_state._partition_cursor++;
if (local_state._partition_cursor == _partition_count) {
*eos = true;
@@ -829,6 +835,10 @@ Status
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
RETURN_IF_ERROR(local_state.finish_spilling(0));
}
+ if (local_state._child_block->rows() == 0 && !local_state._child_eos) {
+ return Status::OK();
+ }
+
Defer defer([&] { local_state._child_block->clear_column_data(); });
if (need_to_spill) {
SCOPED_TIMER(local_state.exec_time_counter());
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 370606b1904..a0adf0505f8 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -93,6 +93,8 @@ Status
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
DCHECK_EQ(_spilling_streams_count, 0);
if (!_shared_state->need_to_spill) {
+ profile()->add_info_string("Spilled", "true");
+ _shared_state->need_to_spill = true;
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
_shared_state->inner_shared_state->hash_table_variants.reset();
auto row_desc = p._child_x->row_desc();
@@ -172,7 +174,6 @@ Status
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
}
if (_spilling_streams_count > 0) {
- _shared_state->need_to_spill = true;
std::unique_lock<std::mutex> lock(_spill_lock);
if (_spilling_streams_count > 0) {
_dependency->block();
@@ -202,7 +203,8 @@ Status
PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state,
SCOPED_TIMER(_partition_shuffle_timer);
auto* channel_ids =
reinterpret_cast<uint64_t*>(_partitioner->get_channel_ids());
std::vector<uint32_t> partition_indexes[p._partition_count];
- for (uint32_t i = 0; i != rows; ++i) {
+ DCHECK_LT(begin, end);
+ for (size_t i = begin; i != end; ++i) {
partition_indexes[channel_ids[i]].emplace_back(i);
}
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index e83434250e0..fd63df92976 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -105,9 +105,10 @@ Status
SpillSortSinkLocalState::setup_in_memory_sort_op(RuntimeState* state) {
auto* sink_local_state = _runtime_state->get_sink_local_state();
DCHECK(sink_local_state != nullptr);
- _profile->add_info_string("TOP-N",
*sink_local_state->profile()->get_info_string("TOP-N"));
+ RETURN_IF_ERROR(sink_local_state->open(state));
- return sink_local_state->open(state);
+ _profile->add_info_string("TOP-N",
*sink_local_state->profile()->get_info_string("TOP-N"));
+ return Status::OK();
}
SpillSortSinkOperatorX::SpillSortSinkOperatorX(ObjectPool* pool, int
operator_id,
@@ -230,6 +231,7 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState*
state) {
auto execution_context = state->get_task_execution_context();
_shared_state_holder = _shared_state->shared_from_this();
+ auto query_id = state->query_id();
MonotonicStopWatch submit_timer;
submit_timer.start();
@@ -237,10 +239,11 @@ Status
SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
status = ExecEnv::GetInstance()
->spill_stream_mgr()
->get_spill_io_thread_pool(_spilling_stream->get_spill_root_dir())
- ->submit_func([this, state, &parent, execution_context,
submit_timer] {
+ ->submit_func([this, state, query_id, &parent,
execution_context,
+ submit_timer] {
auto execution_context_lock =
execution_context.lock();
if (!execution_context_lock) {
- LOG(INFO) << "query " <<
print_id(state->query_id())
+ LOG(INFO) << "query " << print_id(query_id)
<< " execution_context released, maybe
query was cancelled.";
return Status::OK();
}
@@ -250,16 +253,14 @@ Status
SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
Defer defer {[&]() {
if (!_shared_state->sink_status.ok() ||
state->is_cancelled()) {
if (!_shared_state->sink_status.ok()) {
- LOG(WARNING) << "query " <<
print_id(state->query_id())
- << " sort node " <<
_parent->id()
- << " revoke memory error: "
+ LOG(WARNING) << "query " <<
print_id(query_id) << " sort node "
+ << _parent->id() << " revoke
memory error: "
<<
_shared_state->sink_status;
}
_shared_state->close();
} else {
- VLOG_DEBUG << "query " <<
print_id(state->query_id())
- << " sort node " << _parent->id()
- << " revoke memory finish";
+ VLOG_DEBUG << "query " << print_id(query_id)
<< " sort node "
+ << _parent->id() << " revoke
memory finish";
}
_spilling_stream->end_spill(_shared_state->sink_status);
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index 707a1c3f5a1..37b76425b16 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -89,14 +89,15 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
auto execution_context = state->get_task_execution_context();
_shared_state_holder = _shared_state->shared_from_this();
+ auto query_id = state->query_id();
MonotonicStopWatch submit_timer;
submit_timer.start();
- auto spill_func = [this, state, &parent, execution_context, submit_timer] {
+ auto spill_func = [this, state, query_id, &parent, execution_context,
submit_timer] {
auto execution_context_lock = execution_context.lock();
if (!execution_context_lock) {
- LOG(INFO) << "query " << print_id(state->query_id())
+ LOG(INFO) << "query " << print_id(query_id)
<< " execution_context released, maybe query was
cancelled.";
return Status::OK();
}
@@ -107,7 +108,7 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
Defer defer {[&]() {
if (!_status.ok() || state->is_cancelled()) {
if (!_status.ok()) {
- LOG(WARNING) << "query " << print_id(state->query_id()) <<
" sort node "
+ LOG(WARNING) << "query " << print_id(query_id) << " sort
node "
<< _parent->node_id() << " merge spill data
error: " << _status;
}
_shared_state->close();
@@ -116,8 +117,8 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
}
_current_merging_streams.clear();
} else {
- VLOG_DEBUG << "query " << print_id(state->query_id()) << "
sort node "
- << _parent->node_id() << " merge spill data finish";
+ VLOG_DEBUG << "query " << print_id(query_id) << " sort node "
<< _parent->node_id()
+ << " merge spill data finish";
}
_dependency->Dependency::set_ready();
}};
@@ -125,7 +126,7 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
vectorized::SpillStreamSPtr tmp_stream;
while (!state->is_cancelled()) {
int max_stream_count = _calc_spill_blocks_to_merge();
- VLOG_DEBUG << "query " << print_id(state->query_id()) << " sort
node " << _parent->id()
+ VLOG_DEBUG << "query " << print_id(query_id) << " sort node " <<
_parent->id()
<< " merge spill streams, streams count: "
<< _shared_state->sorted_streams.size()
<< ", curren merge max stream count: " <<
max_stream_count;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index e475ed10d5c..76c877c3695 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -151,6 +151,12 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
if (ready()) {
return Status::OK();
}
+ std::unordered_map<std::string, std::unique_ptr<vectorized::SpillDataDir>>
spill_store_map;
+ for (const auto& spill_path : spill_store_paths) {
+ spill_store_map.emplace(spill_path.path,
std::make_unique<vectorized::SpillDataDir>(
+ spill_path.path,
spill_path.capacity_bytes,
+
spill_path.storage_medium));
+ }
init_doris_metrics(store_paths);
_store_paths = store_paths;
_tmp_file_dirs = std::make_unique<segment_v2::TmpFileDirs>(_store_paths);
@@ -246,7 +252,7 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
_wal_manager = WalManager::create_shared(this,
config::group_commit_wal_path);
_dns_cache = new DNSCache();
_write_cooldown_meta_executors =
std::make_unique<WriteCooldownMetaExecutors>();
- _spill_stream_mgr = new vectorized::SpillStreamManager(spill_store_paths);
+ _spill_stream_mgr = new
vectorized::SpillStreamManager(std::move(spill_store_map));
_backend_client_cache->init_metrics("backend");
_frontend_client_cache->init_metrics("frontend");
_broker_client_cache->init_metrics("broker");
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 78e2cd1fe06..2e75218e9cf 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -174,7 +174,6 @@ void WorkloadGroupMgr::refresh_wg_memory_info() {
}
auto process_mem_used = doris::MemInfo::proc_mem_no_allocator_cache();
- auto sys_mem_available = doris::MemInfo::sys_mem_available();
if (proc_vm_rss < all_queries_mem_used) {
all_queries_mem_used = proc_vm_rss;
}
@@ -185,6 +184,7 @@ void WorkloadGroupMgr::refresh_wg_memory_info() {
// we count these cache memories equally on workload groups.
double ratio = (double)proc_vm_rss / (double)all_queries_mem_used;
if (ratio >= 1.25) {
+ auto sys_mem_available = doris::MemInfo::sys_mem_available();
std::string debug_msg = fmt::format(
"\nProcess Memory Summary: process_vm_rss: {}, process mem:
{}, sys mem available: "
"{}, all quries mem: {}",
@@ -192,7 +192,7 @@ void WorkloadGroupMgr::refresh_wg_memory_info() {
PrettyPrinter::print(process_mem_used, TUnit::BYTES),
PrettyPrinter::print(sys_mem_available, TUnit::BYTES),
PrettyPrinter::print(all_queries_mem_used, TUnit::BYTES));
- LOG_EVERY_N(INFO, 10) << debug_msg;
+ VLOG_EVERY_N(1, 10) << debug_msg;
}
for (auto& wg : _workload_groups) {
@@ -229,8 +229,10 @@ void WorkloadGroupMgr::refresh_wg_memory_info() {
PrettyPrinter::print(query_weighted_mem_limit,
TUnit::BYTES));
debug_msg += "\n Query Memory Summary:";
+ } else {
+ continue;
}
- // check where queries need to revoke memory for task group
+ // check whether queries need to revoke memory for task group
for (const auto& query : wg_queries) {
auto query_ctx = query.second.lock();
if (!query_ctx) {
@@ -253,7 +255,7 @@ void WorkloadGroupMgr::refresh_wg_memory_info() {
}
}
if (wg_mem_info.is_high_wartermark || wg_mem_info.is_low_wartermark) {
- LOG_EVERY_N(INFO, 10) << debug_msg;
+ VLOG_EVERY_N(1, 10) << debug_msg;
}
}
}
diff --git a/be/src/vec/spill/spill_stream_manager.cpp
b/be/src/vec/spill/spill_stream_manager.cpp
index 4abca15082c..0259bef33f3 100644
--- a/be/src/vec/spill/spill_stream_manager.cpp
+++ b/be/src/vec/spill/spill_stream_manager.cpp
@@ -27,43 +27,48 @@
#include <random>
#include <string>
+#include "common/logging.h"
#include "io/fs/file_system.h"
#include "io/fs/local_file_system.h"
#include "olap/olap_define.h"
#include "runtime/runtime_state.h"
+#include "util/parse_util.h"
+#include "util/pretty_printer.h"
#include "util/runtime_profile.h"
#include "util/time.h"
#include "vec/spill/spill_stream.h"
namespace doris::vectorized {
-SpillStreamManager::SpillStreamManager(const std::vector<StorePath>& paths)
- : _spill_store_paths(paths), _stop_background_threads_latch(1) {}
+SpillStreamManager::SpillStreamManager(
+ std::unordered_map<std::string,
std::unique_ptr<vectorized::SpillDataDir>>&&
+ spill_store_map)
+ : _spill_store_map(std::move(spill_store_map)),
_stop_background_threads_latch(1) {}
Status SpillStreamManager::init() {
LOG(INFO) << "init spill stream manager";
- RETURN_NOT_OK_STATUS_WITH_WARN(_init_spill_store_map(),
"_init_spill_store_map failed");
+ RETURN_IF_ERROR(_init_spill_store_map());
int spill_io_thread_count =
config::spill_io_thread_pool_per_disk_thread_num;
if (spill_io_thread_count <= 0) {
spill_io_thread_count = 2;
}
int pool_idx = 0;
- for (const auto& path : _spill_store_paths) {
- auto gc_dir_root_dir = fmt::format("{}/{}", path.path,
SPILL_GC_DIR_PREFIX);
+ for (const auto& [path, store] : _spill_store_map) {
+ auto gc_dir_root_dir = fmt::format("{}/{}", path, SPILL_GC_DIR_PREFIX);
bool exists = true;
RETURN_IF_ERROR(io::global_local_filesystem()->exists(gc_dir_root_dir,
&exists));
if (!exists) {
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(gc_dir_root_dir));
}
- auto spill_dir = fmt::format("{}/{}", path.path, SPILL_DIR_PREFIX);
+ auto spill_dir = fmt::format("{}/{}", path, SPILL_DIR_PREFIX);
RETURN_IF_ERROR(io::global_local_filesystem()->exists(spill_dir,
&exists));
if (!exists) {
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(spill_dir));
} else {
auto suffix = ToStringFromUnixMillis(UnixMillis());
- auto gc_dir = fmt::format("{}/{}/{}", path.path,
SPILL_GC_DIR_PREFIX, suffix);
+ auto gc_dir = fmt::format("{}/{}/{}", path, SPILL_GC_DIR_PREFIX,
suffix);
if (std::filesystem::exists(gc_dir)) {
LOG(WARNING) << "gc dir already exists: " << gc_dir;
}
@@ -78,7 +83,7 @@ Status SpillStreamManager::init() {
spill_data_size += dir_entry.file_size();
}
}
- path_to_spill_data_size_[path.path] = spill_data_size;
+ store->update_spill_data_usage(spill_data_size);
std::unique_ptr<ThreadPool> io_pool;
static_cast<void>(ThreadPoolBuilder(fmt::format("SpillIOThreadPool-{}",
pool_idx++))
@@ -86,7 +91,7 @@ Status SpillStreamManager::init() {
.set_max_threads(spill_io_thread_count)
.set_max_queue_size(config::spill_io_thread_pool_queue_size)
.build(&io_pool));
- path_to_io_thread_pool_[path.path] = std::move(io_pool);
+ path_to_io_thread_pool_[path] = std::move(io_pool);
}
static_cast<void>(ThreadPoolBuilder("SpillAsyncTaskThreadPool")
.set_min_threads(config::spill_async_task_thread_pool_thread_num)
@@ -105,21 +110,16 @@ Status SpillStreamManager::init() {
void SpillStreamManager::_spill_gc_thread_callback() {
while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(config::spill_gc_interval_ms))) {
- gc(2000);
+ gc(config::spill_gc_file_count);
+ for (auto& [path, dir] : _spill_store_map) {
+ static_cast<void>(dir->update_capacity());
+ }
}
}
Status SpillStreamManager::_init_spill_store_map() {
- for (const auto& path : _spill_store_paths) {
- auto store =
- std::make_unique<SpillDataDir>(path.path, path.capacity_bytes,
path.storage_medium);
- auto st = store->init();
- if (!st.ok()) {
- LOG(WARNING) << "Store load failed, status=" << st.to_string()
- << ", path=" << store->path();
- return st;
- }
- _spill_store_map.emplace(store->path(), std::move(store));
+ for (const auto& store : _spill_store_map) {
+ RETURN_IF_ERROR(store.second->init());
}
return Status::OK();
@@ -133,18 +133,23 @@ std::vector<SpillDataDir*>
SpillStreamManager::_get_stores_for_spill(
stores.push_back(store.get());
}
}
+ if (stores.empty()) {
+ return stores;
+ }
- std::sort(stores.begin(), stores.end(),
- [](SpillDataDir* a, SpillDataDir* b) { return a->get_usage(0) <
b->get_usage(0); });
+ std::sort(stores.begin(), stores.end(), [](SpillDataDir* a, SpillDataDir*
b) {
+ return a->_get_disk_usage(0) < b->_get_disk_usage(0);
+ });
size_t seventy_percent_index = stores.size();
size_t eighty_five_percent_index = stores.size();
for (size_t index = 0; index < stores.size(); index++) {
// If the usage of the store is less than 70%, we choose disk randomly.
- if (stores[index]->get_usage(0) > 0.7 && seventy_percent_index ==
stores.size()) {
+ if (stores[index]->_get_disk_usage(0) > 0.7 && seventy_percent_index
== stores.size()) {
seventy_percent_index = index;
}
- if (stores[index]->get_usage(0) > 0.85 && eighty_five_percent_index ==
stores.size()) {
+ if (stores[index]->_get_disk_usage(0) > 0.85 &&
+ eighty_five_percent_index == stores.size()) {
eighty_five_percent_index = index;
break;
}
@@ -153,9 +158,13 @@ std::vector<SpillDataDir*>
SpillStreamManager::_get_stores_for_spill(
std::random_device rd;
std::mt19937 g(rd());
std::shuffle(stores.begin(), stores.begin() + seventy_percent_index, g);
- std::shuffle(stores.begin() + seventy_percent_index, stores.begin() +
eighty_five_percent_index,
- g);
- std::shuffle(stores.begin() + eighty_five_percent_index, stores.end(), g);
+ if (seventy_percent_index != stores.size()) {
+ std::shuffle(stores.begin() + seventy_percent_index,
+ stores.begin() + eighty_five_percent_index, g);
+ }
+ if (eighty_five_percent_index != stores.size()) {
+ std::shuffle(stores.begin() + eighty_five_percent_index, stores.end(),
g);
+ }
return stores;
}
@@ -210,8 +219,8 @@ void SpillStreamManager::gc(int64_t max_file_count) {
bool exists = true;
int64_t count = 0;
- for (const auto& path : _spill_store_paths) {
- std::string gc_root_dir = fmt::format("{}/{}", path.path,
SPILL_GC_DIR_PREFIX);
+ for (const auto& [path, store_dir] : _spill_store_map) {
+ std::string gc_root_dir = fmt::format("{}/{}", path,
SPILL_GC_DIR_PREFIX);
std::error_code ec;
exists = std::filesystem::exists(gc_root_dir, ec);
@@ -243,7 +252,7 @@ void SpillStreamManager::gc(int64_t max_file_count) {
}
int64_t data_size = 0;
- Defer defer {[&]() { update_usage(path.path, -data_size); }};
+ Defer defer {[&]() {
store_dir->update_spill_data_usage(-data_size); }};
for (const auto& file : files) {
auto abs_file_path = fmt::format("{}/{}", abs_dir,
file.file_name);
@@ -257,11 +266,10 @@ void SpillStreamManager::gc(int64_t max_file_count) {
}
}
-SpillDataDir::SpillDataDir(const std::string& path, int64_t capacity_bytes,
+SpillDataDir::SpillDataDir(std::string path, int64_t capacity_bytes,
TStorageMedium::type storage_medium)
- : _path(path),
- _available_bytes(0),
- _disk_capacity_bytes(0),
+ : _path(std::move(path)),
+ _disk_capacity_bytes(capacity_bytes),
_storage_medium(storage_medium) {}
Status SpillDataDir::init() {
@@ -271,11 +279,45 @@ Status SpillDataDir::init() {
RETURN_NOT_OK_STATUS_WITH_WARN(Status::IOError("opendir failed,
path={}", _path),
"check file exist failed");
}
+ RETURN_IF_ERROR(update_capacity());
+ LOG(INFO) << fmt::format(
+ "spill storage path: {}, capacity: {}, limit: {}, available: "
+ "{}",
+ _path, PrettyPrinter::print_bytes(_disk_capacity_bytes),
+ PrettyPrinter::print_bytes(_spill_data_limit_bytes),
+ PrettyPrinter::print_bytes(_available_bytes));
+ return Status::OK();
+}
+
+Status SpillDataDir::update_capacity() {
+ std::lock_guard<std::mutex> l(_mutex);
+ RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info(_path,
&_disk_capacity_bytes,
+
&_available_bytes));
+ auto disk_use_max_bytes = (int64_t)(_disk_capacity_bytes *
+
config::storage_flood_stage_usage_percent / (double)100);
+ bool is_percent = true;
+ _spill_data_limit_bytes =
ParseUtil::parse_mem_spec(config::spill_storage_limit, -1,
+ _disk_capacity_bytes,
&is_percent);
+ if (_spill_data_limit_bytes <= 0) {
+ auto err_msg = fmt::format("Failed to parse spill storage limit from
'{}'",
+ config::spill_storage_limit);
+ LOG(WARNING) << err_msg;
+ return Status::InvalidArgument(err_msg);
+ }
+ if (is_percent) {
+ _spill_data_limit_bytes =
+ (int64_t)(_spill_data_limit_bytes *
config::storage_flood_stage_usage_percent /
+ (double)100);
+ }
+ if (_spill_data_limit_bytes > disk_use_max_bytes) {
+ _spill_data_limit_bytes = disk_use_max_bytes;
+ }
return Status::OK();
}
-bool SpillDataDir::reach_capacity_limit(int64_t incoming_data_size) {
- double used_pct = get_usage(incoming_data_size);
+
+bool SpillDataDir::_reach_disk_capacity_limit(int64_t incoming_data_size) {
+ double used_pct = _get_disk_usage(incoming_data_size);
int64_t left_bytes = _available_bytes - incoming_data_size;
if (used_pct >= config::storage_flood_stage_usage_percent / 100.0 &&
left_bytes <= config::storage_flood_stage_left_capacity_bytes) {
@@ -285,4 +327,24 @@ bool SpillDataDir::reach_capacity_limit(int64_t
incoming_data_size) {
}
return false;
}
+bool SpillDataDir::reach_capacity_limit(int64_t incoming_data_size) {
+ std::lock_guard<std::mutex> l(_mutex);
+ if (_reach_disk_capacity_limit(incoming_data_size)) {
+ return true;
+ }
+ if (_spill_data_bytes + incoming_data_size > _spill_data_limit_bytes) {
+ LOG_EVERY_T(WARNING, 1) << fmt::format(
+ "spill data reach limit, path: {}, capacity: {}, limit: {},
used: {}, available: "
+ "{}, "
+ "incoming "
+ "bytes: {}",
+ _path, PrettyPrinter::print_bytes(_disk_capacity_bytes),
+ PrettyPrinter::print_bytes(_spill_data_limit_bytes),
+ PrettyPrinter::print_bytes(_spill_data_bytes),
+ PrettyPrinter::print_bytes(_available_bytes),
+ PrettyPrinter::print_bytes(incoming_data_size));
+ return true;
+ }
+ return false;
+}
} // namespace doris::vectorized
diff --git a/be/src/vec/spill/spill_stream_manager.h
b/be/src/vec/spill/spill_stream_manager.h
index f73f840458e..2d7350f775f 100644
--- a/be/src/vec/spill/spill_stream_manager.h
+++ b/be/src/vec/spill/spill_stream_manager.h
@@ -31,48 +31,65 @@ class RuntimeProfile;
namespace vectorized {
+class SpillStreamManager;
class SpillDataDir {
public:
- SpillDataDir(const std::string& path, int64_t capacity_bytes = -1,
+ SpillDataDir(std::string path, int64_t capacity_bytes,
TStorageMedium::type storage_medium = TStorageMedium::HDD);
Status init();
const std::string& path() const { return _path; }
- bool is_ssd_disk() const { return _storage_medium == TStorageMedium::SSD; }
-
TStorageMedium::type storage_medium() const { return _storage_medium; }
// check if the capacity reach the limit after adding the incoming data
// return true if limit reached, otherwise, return false.
- // TODO(cmy): for now we can not precisely calculate the capacity Doris
used,
- // so in order to avoid running out of disk capacity, we currently use the
actual
- // disk available capacity and total capacity to do the calculation.
- // So that the capacity Doris actually used may exceeds the user specified
capacity.
bool reach_capacity_limit(int64_t incoming_data_size);
Status update_capacity();
- double get_usage(int64_t incoming_data_size) const {
+ void update_spill_data_usage(int64_t incoming_data_size) {
+ std::lock_guard<std::mutex> l(_mutex);
+ _spill_data_bytes += incoming_data_size;
+ }
+
+ int64_t get_spill_data_bytes() {
+ std::lock_guard<std::mutex> l(_mutex);
+ return _spill_data_bytes;
+ }
+
+ int64_t get_spill_data_limit() {
+ std::lock_guard<std::mutex> l(_mutex);
+ return _spill_data_limit_bytes;
+ }
+
+private:
+ bool _reach_disk_capacity_limit(int64_t incoming_data_size);
+ double _get_disk_usage(int64_t incoming_data_size) const {
return _disk_capacity_bytes == 0
? 0
: (_disk_capacity_bytes - _available_bytes +
incoming_data_size) /
(double)_disk_capacity_bytes;
}
-private:
+ friend class SpillStreamManager;
std::string _path;
- // the actual available capacity of the disk of this data dir
- size_t _available_bytes;
+ // protect _disk_capacity_bytes, _available_bytes,
_spill_data_limit_bytes, _spill_data_bytes
+ std::mutex _mutex;
// the actual capacity of the disk of this data dir
size_t _disk_capacity_bytes;
+ int64_t _spill_data_limit_bytes = 0;
+ // the actual available capacity of the disk of this data dir
+ size_t _available_bytes = 0;
+ int64_t _spill_data_bytes = 0;
TStorageMedium::type _storage_medium;
};
class SpillStreamManager {
public:
- SpillStreamManager(const std::vector<StorePath>& paths);
+ SpillStreamManager(std::unordered_map<std::string,
std::unique_ptr<vectorized::SpillDataDir>>&&
+ spill_store_map);
Status init();
@@ -93,16 +110,6 @@ public:
void gc(int64_t max_file_count);
- void update_usage(const std::string& path, int64_t incoming_data_size) {
- path_to_spill_data_size_[path] += incoming_data_size;
- }
-
- static bool reach_capacity_limit(size_t size, size_t incoming_data_size) {
- return size + incoming_data_size > config::spill_storage_limit;
- }
-
- int64_t spilled_data_size(const std::string& path) { return
path_to_spill_data_size_[path]; }
-
ThreadPool* get_spill_io_thread_pool(const std::string& path) const {
const auto it = path_to_io_thread_pool_.find(path);
DCHECK(it != path_to_io_thread_pool_.end());
@@ -115,13 +122,11 @@ private:
void _spill_gc_thread_callback();
std::vector<SpillDataDir*> _get_stores_for_spill(TStorageMedium::type
storage_medium);
- std::vector<StorePath> _spill_store_paths;
std::unordered_map<std::string, std::unique_ptr<SpillDataDir>>
_spill_store_map;
CountDownLatch _stop_background_threads_latch;
std::unique_ptr<ThreadPool> async_task_thread_pool_;
std::unordered_map<std::string, std::unique_ptr<ThreadPool>>
path_to_io_thread_pool_;
- std::unordered_map<std::string, std::atomic_int64_t>
path_to_spill_data_size_;
scoped_refptr<Thread> _spill_gc_thread;
std::atomic_uint64_t id_ = 0;
diff --git a/be/src/vec/spill/spill_writer.cpp
b/be/src/vec/spill/spill_writer.cpp
index f657f580074..a48fb0b1dcb 100644
--- a/be/src/vec/spill/spill_writer.cpp
+++ b/be/src/vec/spill/spill_writer.cpp
@@ -52,7 +52,7 @@ Status SpillWriter::close() {
total_written_bytes_ += meta_.size();
COUNTER_UPDATE(write_bytes_counter_, meta_.size());
-
ExecEnv::GetInstance()->spill_stream_mgr()->update_usage(data_dir_->path(),
meta_.size());
+ data_dir_->update_spill_data_usage(meta_.size());
RETURN_IF_ERROR(file_writer_->close());
@@ -112,21 +112,19 @@ Status SpillWriter::_write_internal(const Block& block,
size_t& written_bytes) {
"serialize spill data error. [path={}]", file_path_);
}
}
- auto* spill_stream_mgr = ExecEnv::GetInstance()->spill_stream_mgr();
- auto splled_data_size =
spill_stream_mgr->spilled_data_size(data_dir_->path());
- if (spill_stream_mgr->reach_capacity_limit(splled_data_size,
buff.size())) {
+ if (data_dir_->reach_capacity_limit(buff.size())) {
return Status::Error<ErrorCode::DISK_REACH_CAPACITY_LIMIT>(
"spill data total size exceed limit, path: {}, size limit:
{}, spill data "
"size: {}",
- data_dir_->path(),
PrettyPrinter::print_bytes(config::spill_storage_limit),
- PrettyPrinter::print_bytes(
-
spill_stream_mgr->spilled_data_size(data_dir_->path())));
+ data_dir_->path(),
+
PrettyPrinter::print_bytes(data_dir_->get_spill_data_limit()),
+
PrettyPrinter::print_bytes(data_dir_->get_spill_data_bytes()));
}
{
Defer defer {[&]() {
if (status.ok()) {
- spill_stream_mgr->update_usage(data_dir_->path(),
buff.size());
+ data_dir_->update_spill_data_usage(buff.size());
}
}};
{
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index a5706622221..eb1d511f45e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -1707,7 +1707,7 @@ public class SessionVariable implements Serializable,
Writable {
description = {"控制是否启用join算子落盘。默认为 false。",
"Controls whether to enable spill to disk of join
operation. "
+ "The default value is false."},
- needForward = true)
+ needForward = true, fuzzy = true)
public boolean enableJoinSpill = false;
@VariableMgr.VarAttr(
@@ -1715,7 +1715,7 @@ public class SessionVariable implements Serializable,
Writable {
description = {"控制是否启用排序算子落盘。默认为 false。",
"Controls whether to enable spill to disk of sort
operation. "
+ "The default value is false."},
- needForward = true)
+ needForward = true, fuzzy = true)
public boolean enableSortSpill = false;
@VariableMgr.VarAttr(
@@ -1723,14 +1723,14 @@ public class SessionVariable implements Serializable,
Writable {
description = {"控制是否启用聚合算子落盘。默认为 false。",
"Controls whether to enable spill to disk of aggregation
operation. "
+ "The default value is false."},
- needForward = true)
+ needForward = true, fuzzy = true)
public boolean enableAggSpill = false;
// If the memory consumption of sort node exceed this limit, will trigger
spill to disk;
// Set to 0 to disable; min: 128M
public static final long MIN_EXTERNAL_SORT_BYTES_THRESHOLD = 2097152;
@VariableMgr.VarAttr(name = EXTERNAL_SORT_BYTES_THRESHOLD,
- checker = "checkExternalSortBytesThreshold", fuzzy = true)
+ checker = "checkExternalSortBytesThreshold", varType =
VariableAnnotation.DEPRECATED)
public long externalSortBytesThreshold = 0;
// Set to 0 to disable; min: 128M
@@ -1741,7 +1741,7 @@ public class SessionVariable implements Serializable,
Writable {
// The memory limit of streaming agg when spilling is enabled
// NOTE: streaming agg operator will not spill to disk.
- @VariableMgr.VarAttr(name = SPILL_STREAMING_AGG_MEM_LIMIT)
+ @VariableMgr.VarAttr(name = SPILL_STREAMING_AGG_MEM_LIMIT, fuzzy = true)
public long spillStreamingAggMemLimit = 268435456; //256MB
public static final int MIN_EXTERNAL_AGG_PARTITION_BITS = 4;
@@ -1880,6 +1880,38 @@ public class SessionVariable implements Serializable,
Writable {
// set random 1, 10, 100, 1000, 10000
this.topnOptLimitThreshold = (int) Math.pow(10, random.nextInt(5));
+
+ // for spill to disk
+ /*
+ if (Config.pull_request_id > 10000) {
+ if (Config.pull_request_id % 2 == 1) {
+ this.enablePipelineXEngine = true;
+ this.enableJoinSpill = true;
+ this.enableSortSpill = true;
+ this.enableAggSpill = true;
+
+ randomInt = random.nextInt(4);
+ switch (randomInt) {
+ case 0:
+ this.minRevocableMem = 0;
+ break;
+ case 1:
+ this.minRevocableMem = 1;
+ break;
+ case 2:
+ this.minRevocableMem = 1024 * 1024;
+ break;
+ default:
+ this.minRevocableMem = 100 * 1024 * 1024 * 1024;
+ break;
+ }
+ } else {
+ this.enableJoinSpill = false;
+ this.enableSortSpill = false;
+ this.enableAggSpill = false;
+ }
+ }
+ */
}
public String printFuzzyVariables() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]