This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-4.0-preview in repository https://gitbox.apache.org/repos/asf/doris.git
commit 7a030a2e5ef5cbd233f0b2d5203adb333688cdb5 Author: TengJianPing <[email protected]> AuthorDate: Tue Apr 16 15:46:13 2024 +0800 [improvement](spill) improve config and fix spill bugs (#33519) --- be/src/common/config.cpp | 6 +- be/src/common/config.h | 11 +- be/src/olap/storage_engine.h | 2 +- .../exec/partitioned_aggregation_sink_operator.cpp | 9 +- .../partitioned_aggregation_source_operator.cpp | 13 +- .../exec/partitioned_hash_join_probe_operator.cpp | 24 ++-- .../exec/partitioned_hash_join_sink_operator.cpp | 6 +- be/src/pipeline/exec/spill_sort_sink_operator.cpp | 21 ++-- .../pipeline/exec/spill_sort_source_operator.cpp | 13 +- be/src/runtime/exec_env_init.cpp | 8 +- .../workload_group/workload_group_manager.cpp | 10 +- be/src/vec/spill/spill_stream_manager.cpp | 134 +++++++++++++++------ be/src/vec/spill/spill_stream_manager.h | 53 ++++---- be/src/vec/spill/spill_writer.cpp | 14 +-- .../java/org/apache/doris/qe/SessionVariable.java | 42 ++++++- 15 files changed, 247 insertions(+), 119 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index f29ec307914..8e856977888 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1160,13 +1160,13 @@ DEFINE_mDouble(high_disk_avail_level_diff_usages, "0.15"); DEFINE_Int32(partition_disk_index_lru_size, "10000"); // limit the storage space that query spill files can use DEFINE_String(spill_storage_root_path, "${DORIS_HOME}/storage"); -DEFINE_mInt64(spill_storage_limit, "10737418240"); // 10G -DEFINE_mInt32(spill_gc_interval_ms, "2000"); // 2s +DEFINE_String(spill_storage_limit, "20%"); // 20% +DEFINE_mInt32(spill_gc_interval_ms, "2000"); // 2s +DEFINE_mInt32(spill_gc_file_count, "2000"); DEFINE_Int32(spill_io_thread_pool_per_disk_thread_num, "2"); DEFINE_Int32(spill_io_thread_pool_queue_size, "1024"); DEFINE_Int32(spill_async_task_thread_pool_thread_num, "2"); DEFINE_Int32(spill_async_task_thread_pool_queue_size, "1024"); -DEFINE_mInt32(spill_mem_warning_water_mark_multiplier, "2"); DEFINE_mBool(check_segment_when_build_rowset_meta, "false"); diff --git a/be/src/common/config.h b/be/src/common/config.h index 81fabfb9517..191ebcc4f3b 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1242,13 +1242,20 @@ DECLARE_mDouble(high_disk_avail_level_diff_usages); // create tablet in partition random robin idx lru size, default 10000 DECLARE_Int32(partition_disk_index_lru_size); DECLARE_String(spill_storage_root_path); -DECLARE_mInt64(spill_storage_limit); +// Spill storage limit specified as number of bytes +// ('<int>[bB]?'), megabytes ('<float>[mM]'), gigabytes ('<float>[gG]'), +// or percentage of capaity ('<int>%'). +// Defaults to bytes if no unit is given. +// Must larger than 0. +// If specified as percentage, the final limit value is: +// disk_capacity_bytes * storage_flood_stage_usage_percent * spill_storage_limit +DECLARE_String(spill_storage_limit); DECLARE_mInt32(spill_gc_interval_ms); +DECLARE_mInt32(spill_gc_file_count); DECLARE_Int32(spill_io_thread_pool_per_disk_thread_num); DECLARE_Int32(spill_io_thread_pool_queue_size); DECLARE_Int32(spill_async_task_thread_pool_thread_num); DECLARE_Int32(spill_async_task_thread_pool_queue_size); -DECLARE_mInt32(spill_mem_warning_water_mark_multiplier); DECLARE_mBool(check_segment_when_build_rowset_meta); diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 6888af9ee17..362f899c81a 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -170,7 +170,7 @@ public: // get all info of root_path Status get_all_data_dir_info(std::vector<DataDirInfo>* data_dir_infos, bool need_update); - int64_t get_file_or_directory_size(const std::string& file_path); + static int64_t get_file_or_directory_size(const std::string& file_path); // get root path for creating tablet. The returned vector of root path should be round robin, // for avoiding that all the tablet would be deployed one disk. diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 4ea531bade0..55a0650dc1f 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -249,14 +249,15 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { auto execution_context = state->get_task_execution_context(); _shared_state_holder = _shared_state->shared_from_this(); + auto query_id = state->query_id(); MonotonicStopWatch submit_timer; submit_timer.start(); status = ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func( - [this, &parent, state, execution_context, submit_timer] { + [this, &parent, state, query_id, execution_context, submit_timer] { auto execution_context_lock = execution_context.lock(); if (!execution_context_lock) { - LOG(INFO) << "query " << print_id(state->query_id()) + LOG(INFO) << "query " << print_id(query_id) << " execution_context released, maybe query was cancelled."; return Status::Cancelled("Cancelled"); } @@ -267,13 +268,13 @@ Status PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) { if (!_shared_state->sink_status.ok() || state->is_cancelled()) { if (!_shared_state->sink_status.ok()) { LOG(WARNING) - << "query " << print_id(state->query_id()) << " agg node " + << "query " << print_id(query_id) << " agg node " << Base::_parent->id() << " revoke_memory error: " << Base::_shared_state->sink_status; } _shared_state->close(); } else { - VLOG_DEBUG << "query " << print_id(state->query_id()) << " agg node " + VLOG_DEBUG << "query " << print_id(query_id) << " agg node " << Base::_parent->id() << " revoke_memory finish" << ", eos: " << _eos; } diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp index a2484cd6db4..a5753ef7654 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp @@ -205,16 +205,17 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime auto execution_context = state->get_task_execution_context(); _shared_state_holder = _shared_state->shared_from_this(); + auto query_id = state->query_id(); MonotonicStopWatch submit_timer; submit_timer.start(); RETURN_IF_ERROR( ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool()->submit_func( - [this, state, execution_context, submit_timer] { + [this, state, query_id, execution_context, submit_timer] { auto execution_context_lock = execution_context.lock(); if (!execution_context_lock) { - LOG(INFO) << "query " << print_id(state->query_id()) + LOG(INFO) << "query " << print_id(query_id) << " execution_context released, maybe query was cancelled."; // FIXME: return status is meaningless? return Status::Cancelled("Cancelled"); @@ -225,14 +226,14 @@ Status PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime Defer defer {[&]() { if (!_status.ok() || state->is_cancelled()) { if (!_status.ok()) { - LOG(WARNING) << "query " << print_id(state->query_id()) - << " agg node " << _parent->node_id() + LOG(WARNING) << "query " << print_id(query_id) << " agg node " + << _parent->node_id() << " merge spilled agg data error: " << _status; } _shared_state->close(); } else if (_shared_state->spill_partitions.empty()) { - VLOG_DEBUG << "query " << print_id(state->query_id()) - << " agg node " << _parent->node_id() + VLOG_DEBUG << "query " << print_id(query_id) << " agg node " + << _parent->node_id() << " merge spilled agg data finish"; } Base::_shared_state->in_mem_shared_state->aggregate_data_container diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index 0f837f8bbda..35750ee5a55 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -493,6 +493,7 @@ Status PartitionedHashJoinProbeOperatorX::prepare(RuntimeState* state) { // to avoid prepare _child_x twice auto child_x = std::move(_child_x); RETURN_IF_ERROR(JoinProbeOperatorX::prepare(state)); + RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_ctxs, state, *_intermediate_row_desc)); RETURN_IF_ERROR(_inner_probe_operator->set_child(child_x)); DCHECK(_build_side_child != nullptr); _inner_probe_operator->set_build_side_child(_build_side_child); @@ -648,6 +649,8 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, } } + const auto partition_index = local_state._partition_cursor; + auto& probe_blocks = local_state._probe_blocks[partition_index]; if (local_state._need_to_setup_internal_operators) { *eos = false; bool has_data = false; @@ -659,12 +662,13 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, } RETURN_IF_ERROR(_setup_internal_operators(local_state, state)); local_state._need_to_setup_internal_operators = false; + auto& mutable_block = local_state._partitioned_blocks[partition_index]; + if (mutable_block && !mutable_block->empty()) { + probe_blocks.emplace_back(mutable_block->to_block()); + } } - - auto partition_index = local_state._partition_cursor; - bool in_mem_eos_; + bool in_mem_eos = false; auto* runtime_state = local_state._runtime_state.get(); - auto& probe_blocks = local_state._probe_blocks[partition_index]; while (_inner_probe_operator->need_more_input_data(runtime_state)) { if (probe_blocks.empty()) { *eos = false; @@ -682,14 +686,16 @@ Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state, auto block = std::move(probe_blocks.back()); probe_blocks.pop_back(); - RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state, &block, false)); + if (!block.empty()) { + RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state, &block, false)); + } } RETURN_IF_ERROR(_inner_probe_operator->pull(local_state._runtime_state.get(), output_block, - &in_mem_eos_)); + &in_mem_eos)); *eos = false; - if (in_mem_eos_) { + if (in_mem_eos) { local_state._partition_cursor++; if (local_state._partition_cursor == _partition_count) { *eos = true; @@ -829,6 +835,10 @@ Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori RETURN_IF_ERROR(local_state.finish_spilling(0)); } + if (local_state._child_block->rows() == 0 && !local_state._child_eos) { + return Status::OK(); + } + Defer defer([&] { local_state._child_block->clear_column_data(); }); if (need_to_spill) { SCOPED_TIMER(local_state.exec_time_counter()); diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 370606b1904..a0adf0505f8 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -93,6 +93,8 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { DCHECK_EQ(_spilling_streams_count, 0); if (!_shared_state->need_to_spill) { + profile()->add_info_string("Spilled", "true"); + _shared_state->need_to_spill = true; auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>(); _shared_state->inner_shared_state->hash_table_variants.reset(); auto row_desc = p._child_x->row_desc(); @@ -172,7 +174,6 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) { } if (_spilling_streams_count > 0) { - _shared_state->need_to_spill = true; std::unique_lock<std::mutex> lock(_spill_lock); if (_spilling_streams_count > 0) { _dependency->block(); @@ -202,7 +203,8 @@ Status PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state, SCOPED_TIMER(_partition_shuffle_timer); auto* channel_ids = reinterpret_cast<uint64_t*>(_partitioner->get_channel_ids()); std::vector<uint32_t> partition_indexes[p._partition_count]; - for (uint32_t i = 0; i != rows; ++i) { + DCHECK_LT(begin, end); + for (size_t i = begin; i != end; ++i) { partition_indexes[channel_ids[i]].emplace_back(i); } diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 78c5c9f51e9..cc80fc205d7 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -106,9 +106,10 @@ Status SpillSortSinkLocalState::setup_in_memory_sort_op(RuntimeState* state) { auto* sink_local_state = _runtime_state->get_sink_local_state(); DCHECK(sink_local_state != nullptr); - _profile->add_info_string("TOP-N", *sink_local_state->profile()->get_info_string("TOP-N")); + RETURN_IF_ERROR(sink_local_state->open(state)); - return sink_local_state->open(state); + _profile->add_info_string("TOP-N", *sink_local_state->profile()->get_info_string("TOP-N")); + return Status::OK(); } SpillSortSinkOperatorX::SpillSortSinkOperatorX(ObjectPool* pool, int operator_id, @@ -231,6 +232,7 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) { auto execution_context = state->get_task_execution_context(); _shared_state_holder = _shared_state->shared_from_this(); + auto query_id = state->query_id(); MonotonicStopWatch submit_timer; submit_timer.start(); @@ -238,10 +240,11 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) { status = ExecEnv::GetInstance() ->spill_stream_mgr() ->get_spill_io_thread_pool(_spilling_stream->get_spill_root_dir()) - ->submit_func([this, state, &parent, execution_context, submit_timer] { + ->submit_func([this, state, query_id, &parent, execution_context, + submit_timer] { auto execution_context_lock = execution_context.lock(); if (!execution_context_lock) { - LOG(INFO) << "query " << print_id(state->query_id()) + LOG(INFO) << "query " << print_id(query_id) << " execution_context released, maybe query was cancelled."; return Status::OK(); } @@ -251,16 +254,14 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) { Defer defer {[&]() { if (!_shared_state->sink_status.ok() || state->is_cancelled()) { if (!_shared_state->sink_status.ok()) { - LOG(WARNING) << "query " << print_id(state->query_id()) - << " sort node " << _parent->id() - << " revoke memory error: " + LOG(WARNING) << "query " << print_id(query_id) << " sort node " + << _parent->id() << " revoke memory error: " << _shared_state->sink_status; } _shared_state->close(); } else { - VLOG_DEBUG << "query " << print_id(state->query_id()) - << " sort node " << _parent->id() - << " revoke memory finish"; + VLOG_DEBUG << "query " << print_id(query_id) << " sort node " + << _parent->id() << " revoke memory finish"; } _spilling_stream->end_spill(_shared_state->sink_status); diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp b/be/src/pipeline/exec/spill_sort_source_operator.cpp index 5edb0daf7fc..107868f968d 100644 --- a/be/src/pipeline/exec/spill_sort_source_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp @@ -85,14 +85,15 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat auto execution_context = state->get_task_execution_context(); _shared_state_holder = _shared_state->shared_from_this(); + auto query_id = state->query_id(); MonotonicStopWatch submit_timer; submit_timer.start(); - auto spill_func = [this, state, &parent, execution_context, submit_timer] { + auto spill_func = [this, state, query_id, &parent, execution_context, submit_timer] { auto execution_context_lock = execution_context.lock(); if (!execution_context_lock) { - LOG(INFO) << "query " << print_id(state->query_id()) + LOG(INFO) << "query " << print_id(query_id) << " execution_context released, maybe query was cancelled."; return Status::OK(); } @@ -103,7 +104,7 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat Defer defer {[&]() { if (!_status.ok() || state->is_cancelled()) { if (!_status.ok()) { - LOG(WARNING) << "query " << print_id(state->query_id()) << " sort node " + LOG(WARNING) << "query " << print_id(query_id) << " sort node " << _parent->node_id() << " merge spill data error: " << _status; } _shared_state->close(); @@ -112,8 +113,8 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat } _current_merging_streams.clear(); } else { - VLOG_DEBUG << "query " << print_id(state->query_id()) << " sort node " - << _parent->node_id() << " merge spill data finish"; + VLOG_DEBUG << "query " << print_id(query_id) << " sort node " << _parent->node_id() + << " merge spill data finish"; } _dependency->Dependency::set_ready(); }}; @@ -121,7 +122,7 @@ Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat vectorized::SpillStreamSPtr tmp_stream; while (!state->is_cancelled()) { int max_stream_count = _calc_spill_blocks_to_merge(); - VLOG_DEBUG << "query " << print_id(state->query_id()) << " sort node " << _parent->id() + VLOG_DEBUG << "query " << print_id(query_id) << " sort node " << _parent->id() << " merge spill streams, streams count: " << _shared_state->sorted_streams.size() << ", curren merge max stream count: " << max_stream_count; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index e475ed10d5c..76c877c3695 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -151,6 +151,12 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths, if (ready()) { return Status::OK(); } + std::unordered_map<std::string, std::unique_ptr<vectorized::SpillDataDir>> spill_store_map; + for (const auto& spill_path : spill_store_paths) { + spill_store_map.emplace(spill_path.path, std::make_unique<vectorized::SpillDataDir>( + spill_path.path, spill_path.capacity_bytes, + spill_path.storage_medium)); + } init_doris_metrics(store_paths); _store_paths = store_paths; _tmp_file_dirs = std::make_unique<segment_v2::TmpFileDirs>(_store_paths); @@ -246,7 +252,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths, _wal_manager = WalManager::create_shared(this, config::group_commit_wal_path); _dns_cache = new DNSCache(); _write_cooldown_meta_executors = std::make_unique<WriteCooldownMetaExecutors>(); - _spill_stream_mgr = new vectorized::SpillStreamManager(spill_store_paths); + _spill_stream_mgr = new vectorized::SpillStreamManager(std::move(spill_store_map)); _backend_client_cache->init_metrics("backend"); _frontend_client_cache->init_metrics("frontend"); _broker_client_cache->init_metrics("broker"); diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 78e2cd1fe06..2e75218e9cf 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -174,7 +174,6 @@ void WorkloadGroupMgr::refresh_wg_memory_info() { } auto process_mem_used = doris::MemInfo::proc_mem_no_allocator_cache(); - auto sys_mem_available = doris::MemInfo::sys_mem_available(); if (proc_vm_rss < all_queries_mem_used) { all_queries_mem_used = proc_vm_rss; } @@ -185,6 +184,7 @@ void WorkloadGroupMgr::refresh_wg_memory_info() { // we count these cache memories equally on workload groups. double ratio = (double)proc_vm_rss / (double)all_queries_mem_used; if (ratio >= 1.25) { + auto sys_mem_available = doris::MemInfo::sys_mem_available(); std::string debug_msg = fmt::format( "\nProcess Memory Summary: process_vm_rss: {}, process mem: {}, sys mem available: " "{}, all quries mem: {}", @@ -192,7 +192,7 @@ void WorkloadGroupMgr::refresh_wg_memory_info() { PrettyPrinter::print(process_mem_used, TUnit::BYTES), PrettyPrinter::print(sys_mem_available, TUnit::BYTES), PrettyPrinter::print(all_queries_mem_used, TUnit::BYTES)); - LOG_EVERY_N(INFO, 10) << debug_msg; + VLOG_EVERY_N(1, 10) << debug_msg; } for (auto& wg : _workload_groups) { @@ -229,8 +229,10 @@ void WorkloadGroupMgr::refresh_wg_memory_info() { PrettyPrinter::print(query_weighted_mem_limit, TUnit::BYTES)); debug_msg += "\n Query Memory Summary:"; + } else { + continue; } - // check where queries need to revoke memory for task group + // check whether queries need to revoke memory for task group for (const auto& query : wg_queries) { auto query_ctx = query.second.lock(); if (!query_ctx) { @@ -253,7 +255,7 @@ void WorkloadGroupMgr::refresh_wg_memory_info() { } } if (wg_mem_info.is_high_wartermark || wg_mem_info.is_low_wartermark) { - LOG_EVERY_N(INFO, 10) << debug_msg; + VLOG_EVERY_N(1, 10) << debug_msg; } } } diff --git a/be/src/vec/spill/spill_stream_manager.cpp b/be/src/vec/spill/spill_stream_manager.cpp index 4abca15082c..0259bef33f3 100644 --- a/be/src/vec/spill/spill_stream_manager.cpp +++ b/be/src/vec/spill/spill_stream_manager.cpp @@ -27,43 +27,48 @@ #include <random> #include <string> +#include "common/logging.h" #include "io/fs/file_system.h" #include "io/fs/local_file_system.h" #include "olap/olap_define.h" #include "runtime/runtime_state.h" +#include "util/parse_util.h" +#include "util/pretty_printer.h" #include "util/runtime_profile.h" #include "util/time.h" #include "vec/spill/spill_stream.h" namespace doris::vectorized { -SpillStreamManager::SpillStreamManager(const std::vector<StorePath>& paths) - : _spill_store_paths(paths), _stop_background_threads_latch(1) {} +SpillStreamManager::SpillStreamManager( + std::unordered_map<std::string, std::unique_ptr<vectorized::SpillDataDir>>&& + spill_store_map) + : _spill_store_map(std::move(spill_store_map)), _stop_background_threads_latch(1) {} Status SpillStreamManager::init() { LOG(INFO) << "init spill stream manager"; - RETURN_NOT_OK_STATUS_WITH_WARN(_init_spill_store_map(), "_init_spill_store_map failed"); + RETURN_IF_ERROR(_init_spill_store_map()); int spill_io_thread_count = config::spill_io_thread_pool_per_disk_thread_num; if (spill_io_thread_count <= 0) { spill_io_thread_count = 2; } int pool_idx = 0; - for (const auto& path : _spill_store_paths) { - auto gc_dir_root_dir = fmt::format("{}/{}", path.path, SPILL_GC_DIR_PREFIX); + for (const auto& [path, store] : _spill_store_map) { + auto gc_dir_root_dir = fmt::format("{}/{}", path, SPILL_GC_DIR_PREFIX); bool exists = true; RETURN_IF_ERROR(io::global_local_filesystem()->exists(gc_dir_root_dir, &exists)); if (!exists) { RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(gc_dir_root_dir)); } - auto spill_dir = fmt::format("{}/{}", path.path, SPILL_DIR_PREFIX); + auto spill_dir = fmt::format("{}/{}", path, SPILL_DIR_PREFIX); RETURN_IF_ERROR(io::global_local_filesystem()->exists(spill_dir, &exists)); if (!exists) { RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(spill_dir)); } else { auto suffix = ToStringFromUnixMillis(UnixMillis()); - auto gc_dir = fmt::format("{}/{}/{}", path.path, SPILL_GC_DIR_PREFIX, suffix); + auto gc_dir = fmt::format("{}/{}/{}", path, SPILL_GC_DIR_PREFIX, suffix); if (std::filesystem::exists(gc_dir)) { LOG(WARNING) << "gc dir already exists: " << gc_dir; } @@ -78,7 +83,7 @@ Status SpillStreamManager::init() { spill_data_size += dir_entry.file_size(); } } - path_to_spill_data_size_[path.path] = spill_data_size; + store->update_spill_data_usage(spill_data_size); std::unique_ptr<ThreadPool> io_pool; static_cast<void>(ThreadPoolBuilder(fmt::format("SpillIOThreadPool-{}", pool_idx++)) @@ -86,7 +91,7 @@ Status SpillStreamManager::init() { .set_max_threads(spill_io_thread_count) .set_max_queue_size(config::spill_io_thread_pool_queue_size) .build(&io_pool)); - path_to_io_thread_pool_[path.path] = std::move(io_pool); + path_to_io_thread_pool_[path] = std::move(io_pool); } static_cast<void>(ThreadPoolBuilder("SpillAsyncTaskThreadPool") .set_min_threads(config::spill_async_task_thread_pool_thread_num) @@ -105,21 +110,16 @@ Status SpillStreamManager::init() { void SpillStreamManager::_spill_gc_thread_callback() { while (!_stop_background_threads_latch.wait_for( std::chrono::milliseconds(config::spill_gc_interval_ms))) { - gc(2000); + gc(config::spill_gc_file_count); + for (auto& [path, dir] : _spill_store_map) { + static_cast<void>(dir->update_capacity()); + } } } Status SpillStreamManager::_init_spill_store_map() { - for (const auto& path : _spill_store_paths) { - auto store = - std::make_unique<SpillDataDir>(path.path, path.capacity_bytes, path.storage_medium); - auto st = store->init(); - if (!st.ok()) { - LOG(WARNING) << "Store load failed, status=" << st.to_string() - << ", path=" << store->path(); - return st; - } - _spill_store_map.emplace(store->path(), std::move(store)); + for (const auto& store : _spill_store_map) { + RETURN_IF_ERROR(store.second->init()); } return Status::OK(); @@ -133,18 +133,23 @@ std::vector<SpillDataDir*> SpillStreamManager::_get_stores_for_spill( stores.push_back(store.get()); } } + if (stores.empty()) { + return stores; + } - std::sort(stores.begin(), stores.end(), - [](SpillDataDir* a, SpillDataDir* b) { return a->get_usage(0) < b->get_usage(0); }); + std::sort(stores.begin(), stores.end(), [](SpillDataDir* a, SpillDataDir* b) { + return a->_get_disk_usage(0) < b->_get_disk_usage(0); + }); size_t seventy_percent_index = stores.size(); size_t eighty_five_percent_index = stores.size(); for (size_t index = 0; index < stores.size(); index++) { // If the usage of the store is less than 70%, we choose disk randomly. - if (stores[index]->get_usage(0) > 0.7 && seventy_percent_index == stores.size()) { + if (stores[index]->_get_disk_usage(0) > 0.7 && seventy_percent_index == stores.size()) { seventy_percent_index = index; } - if (stores[index]->get_usage(0) > 0.85 && eighty_five_percent_index == stores.size()) { + if (stores[index]->_get_disk_usage(0) > 0.85 && + eighty_five_percent_index == stores.size()) { eighty_five_percent_index = index; break; } @@ -153,9 +158,13 @@ std::vector<SpillDataDir*> SpillStreamManager::_get_stores_for_spill( std::random_device rd; std::mt19937 g(rd()); std::shuffle(stores.begin(), stores.begin() + seventy_percent_index, g); - std::shuffle(stores.begin() + seventy_percent_index, stores.begin() + eighty_five_percent_index, - g); - std::shuffle(stores.begin() + eighty_five_percent_index, stores.end(), g); + if (seventy_percent_index != stores.size()) { + std::shuffle(stores.begin() + seventy_percent_index, + stores.begin() + eighty_five_percent_index, g); + } + if (eighty_five_percent_index != stores.size()) { + std::shuffle(stores.begin() + eighty_five_percent_index, stores.end(), g); + } return stores; } @@ -210,8 +219,8 @@ void SpillStreamManager::gc(int64_t max_file_count) { bool exists = true; int64_t count = 0; - for (const auto& path : _spill_store_paths) { - std::string gc_root_dir = fmt::format("{}/{}", path.path, SPILL_GC_DIR_PREFIX); + for (const auto& [path, store_dir] : _spill_store_map) { + std::string gc_root_dir = fmt::format("{}/{}", path, SPILL_GC_DIR_PREFIX); std::error_code ec; exists = std::filesystem::exists(gc_root_dir, ec); @@ -243,7 +252,7 @@ void SpillStreamManager::gc(int64_t max_file_count) { } int64_t data_size = 0; - Defer defer {[&]() { update_usage(path.path, -data_size); }}; + Defer defer {[&]() { store_dir->update_spill_data_usage(-data_size); }}; for (const auto& file : files) { auto abs_file_path = fmt::format("{}/{}", abs_dir, file.file_name); @@ -257,11 +266,10 @@ void SpillStreamManager::gc(int64_t max_file_count) { } } -SpillDataDir::SpillDataDir(const std::string& path, int64_t capacity_bytes, +SpillDataDir::SpillDataDir(std::string path, int64_t capacity_bytes, TStorageMedium::type storage_medium) - : _path(path), - _available_bytes(0), - _disk_capacity_bytes(0), + : _path(std::move(path)), + _disk_capacity_bytes(capacity_bytes), _storage_medium(storage_medium) {} Status SpillDataDir::init() { @@ -271,11 +279,45 @@ Status SpillDataDir::init() { RETURN_NOT_OK_STATUS_WITH_WARN(Status::IOError("opendir failed, path={}", _path), "check file exist failed"); } + RETURN_IF_ERROR(update_capacity()); + LOG(INFO) << fmt::format( + "spill storage path: {}, capacity: {}, limit: {}, available: " + "{}", + _path, PrettyPrinter::print_bytes(_disk_capacity_bytes), + PrettyPrinter::print_bytes(_spill_data_limit_bytes), + PrettyPrinter::print_bytes(_available_bytes)); + return Status::OK(); +} + +Status SpillDataDir::update_capacity() { + std::lock_guard<std::mutex> l(_mutex); + RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info(_path, &_disk_capacity_bytes, + &_available_bytes)); + auto disk_use_max_bytes = (int64_t)(_disk_capacity_bytes * + config::storage_flood_stage_usage_percent / (double)100); + bool is_percent = true; + _spill_data_limit_bytes = ParseUtil::parse_mem_spec(config::spill_storage_limit, -1, + _disk_capacity_bytes, &is_percent); + if (_spill_data_limit_bytes <= 0) { + auto err_msg = fmt::format("Failed to parse spill storage limit from '{}'", + config::spill_storage_limit); + LOG(WARNING) << err_msg; + return Status::InvalidArgument(err_msg); + } + if (is_percent) { + _spill_data_limit_bytes = + (int64_t)(_spill_data_limit_bytes * config::storage_flood_stage_usage_percent / + (double)100); + } + if (_spill_data_limit_bytes > disk_use_max_bytes) { + _spill_data_limit_bytes = disk_use_max_bytes; + } return Status::OK(); } -bool SpillDataDir::reach_capacity_limit(int64_t incoming_data_size) { - double used_pct = get_usage(incoming_data_size); + +bool SpillDataDir::_reach_disk_capacity_limit(int64_t incoming_data_size) { + double used_pct = _get_disk_usage(incoming_data_size); int64_t left_bytes = _available_bytes - incoming_data_size; if (used_pct >= config::storage_flood_stage_usage_percent / 100.0 && left_bytes <= config::storage_flood_stage_left_capacity_bytes) { @@ -285,4 +327,24 @@ bool SpillDataDir::reach_capacity_limit(int64_t incoming_data_size) { } return false; } +bool SpillDataDir::reach_capacity_limit(int64_t incoming_data_size) { + std::lock_guard<std::mutex> l(_mutex); + if (_reach_disk_capacity_limit(incoming_data_size)) { + return true; + } + if (_spill_data_bytes + incoming_data_size > _spill_data_limit_bytes) { + LOG_EVERY_T(WARNING, 1) << fmt::format( + "spill data reach limit, path: {}, capacity: {}, limit: {}, used: {}, available: " + "{}, " + "incoming " + "bytes: {}", + _path, PrettyPrinter::print_bytes(_disk_capacity_bytes), + PrettyPrinter::print_bytes(_spill_data_limit_bytes), + PrettyPrinter::print_bytes(_spill_data_bytes), + PrettyPrinter::print_bytes(_available_bytes), + PrettyPrinter::print_bytes(incoming_data_size)); + return true; + } + return false; +} } // namespace doris::vectorized diff --git a/be/src/vec/spill/spill_stream_manager.h b/be/src/vec/spill/spill_stream_manager.h index f73f840458e..2d7350f775f 100644 --- a/be/src/vec/spill/spill_stream_manager.h +++ b/be/src/vec/spill/spill_stream_manager.h @@ -31,48 +31,65 @@ class RuntimeProfile; namespace vectorized { +class SpillStreamManager; class SpillDataDir { public: - SpillDataDir(const std::string& path, int64_t capacity_bytes = -1, + SpillDataDir(std::string path, int64_t capacity_bytes, TStorageMedium::type storage_medium = TStorageMedium::HDD); Status init(); const std::string& path() const { return _path; } - bool is_ssd_disk() const { return _storage_medium == TStorageMedium::SSD; } - TStorageMedium::type storage_medium() const { return _storage_medium; } // check if the capacity reach the limit after adding the incoming data // return true if limit reached, otherwise, return false. - // TODO(cmy): for now we can not precisely calculate the capacity Doris used, - // so in order to avoid running out of disk capacity, we currently use the actual - // disk available capacity and total capacity to do the calculation. - // So that the capacity Doris actually used may exceeds the user specified capacity. bool reach_capacity_limit(int64_t incoming_data_size); Status update_capacity(); - double get_usage(int64_t incoming_data_size) const { + void update_spill_data_usage(int64_t incoming_data_size) { + std::lock_guard<std::mutex> l(_mutex); + _spill_data_bytes += incoming_data_size; + } + + int64_t get_spill_data_bytes() { + std::lock_guard<std::mutex> l(_mutex); + return _spill_data_bytes; + } + + int64_t get_spill_data_limit() { + std::lock_guard<std::mutex> l(_mutex); + return _spill_data_limit_bytes; + } + +private: + bool _reach_disk_capacity_limit(int64_t incoming_data_size); + double _get_disk_usage(int64_t incoming_data_size) const { return _disk_capacity_bytes == 0 ? 0 : (_disk_capacity_bytes - _available_bytes + incoming_data_size) / (double)_disk_capacity_bytes; } -private: + friend class SpillStreamManager; std::string _path; - // the actual available capacity of the disk of this data dir - size_t _available_bytes; + // protect _disk_capacity_bytes, _available_bytes, _spill_data_limit_bytes, _spill_data_bytes + std::mutex _mutex; // the actual capacity of the disk of this data dir size_t _disk_capacity_bytes; + int64_t _spill_data_limit_bytes = 0; + // the actual available capacity of the disk of this data dir + size_t _available_bytes = 0; + int64_t _spill_data_bytes = 0; TStorageMedium::type _storage_medium; }; class SpillStreamManager { public: - SpillStreamManager(const std::vector<StorePath>& paths); + SpillStreamManager(std::unordered_map<std::string, std::unique_ptr<vectorized::SpillDataDir>>&& + spill_store_map); Status init(); @@ -93,16 +110,6 @@ public: void gc(int64_t max_file_count); - void update_usage(const std::string& path, int64_t incoming_data_size) { - path_to_spill_data_size_[path] += incoming_data_size; - } - - static bool reach_capacity_limit(size_t size, size_t incoming_data_size) { - return size + incoming_data_size > config::spill_storage_limit; - } - - int64_t spilled_data_size(const std::string& path) { return path_to_spill_data_size_[path]; } - ThreadPool* get_spill_io_thread_pool(const std::string& path) const { const auto it = path_to_io_thread_pool_.find(path); DCHECK(it != path_to_io_thread_pool_.end()); @@ -115,13 +122,11 @@ private: void _spill_gc_thread_callback(); std::vector<SpillDataDir*> _get_stores_for_spill(TStorageMedium::type storage_medium); - std::vector<StorePath> _spill_store_paths; std::unordered_map<std::string, std::unique_ptr<SpillDataDir>> _spill_store_map; CountDownLatch _stop_background_threads_latch; std::unique_ptr<ThreadPool> async_task_thread_pool_; std::unordered_map<std::string, std::unique_ptr<ThreadPool>> path_to_io_thread_pool_; - std::unordered_map<std::string, std::atomic_int64_t> path_to_spill_data_size_; scoped_refptr<Thread> _spill_gc_thread; std::atomic_uint64_t id_ = 0; diff --git a/be/src/vec/spill/spill_writer.cpp b/be/src/vec/spill/spill_writer.cpp index f657f580074..a48fb0b1dcb 100644 --- a/be/src/vec/spill/spill_writer.cpp +++ b/be/src/vec/spill/spill_writer.cpp @@ -52,7 +52,7 @@ Status SpillWriter::close() { total_written_bytes_ += meta_.size(); COUNTER_UPDATE(write_bytes_counter_, meta_.size()); - ExecEnv::GetInstance()->spill_stream_mgr()->update_usage(data_dir_->path(), meta_.size()); + data_dir_->update_spill_data_usage(meta_.size()); RETURN_IF_ERROR(file_writer_->close()); @@ -112,21 +112,19 @@ Status SpillWriter::_write_internal(const Block& block, size_t& written_bytes) { "serialize spill data error. [path={}]", file_path_); } } - auto* spill_stream_mgr = ExecEnv::GetInstance()->spill_stream_mgr(); - auto splled_data_size = spill_stream_mgr->spilled_data_size(data_dir_->path()); - if (spill_stream_mgr->reach_capacity_limit(splled_data_size, buff.size())) { + if (data_dir_->reach_capacity_limit(buff.size())) { return Status::Error<ErrorCode::DISK_REACH_CAPACITY_LIMIT>( "spill data total size exceed limit, path: {}, size limit: {}, spill data " "size: {}", - data_dir_->path(), PrettyPrinter::print_bytes(config::spill_storage_limit), - PrettyPrinter::print_bytes( - spill_stream_mgr->spilled_data_size(data_dir_->path()))); + data_dir_->path(), + PrettyPrinter::print_bytes(data_dir_->get_spill_data_limit()), + PrettyPrinter::print_bytes(data_dir_->get_spill_data_bytes())); } { Defer defer {[&]() { if (status.ok()) { - spill_stream_mgr->update_usage(data_dir_->path(), buff.size()); + data_dir_->update_spill_data_usage(buff.size()); } }}; { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index d0c91ab9b34..26b3cf4adc9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -1713,7 +1713,7 @@ public class SessionVariable implements Serializable, Writable { description = {"控制是否启用join算子落盘。默认为 false。", "Controls whether to enable spill to disk of join operation. " + "The default value is false."}, - needForward = true) + needForward = true, fuzzy = true) public boolean enableJoinSpill = false; @VariableMgr.VarAttr( @@ -1721,7 +1721,7 @@ public class SessionVariable implements Serializable, Writable { description = {"控制是否启用排序算子落盘。默认为 false。", "Controls whether to enable spill to disk of sort operation. " + "The default value is false."}, - needForward = true) + needForward = true, fuzzy = true) public boolean enableSortSpill = false; @VariableMgr.VarAttr( @@ -1729,14 +1729,14 @@ public class SessionVariable implements Serializable, Writable { description = {"控制是否启用聚合算子落盘。默认为 false。", "Controls whether to enable spill to disk of aggregation operation. " + "The default value is false."}, - needForward = true) + needForward = true, fuzzy = true) public boolean enableAggSpill = false; // If the memory consumption of sort node exceed this limit, will trigger spill to disk; // Set to 0 to disable; min: 128M public static final long MIN_EXTERNAL_SORT_BYTES_THRESHOLD = 2097152; @VariableMgr.VarAttr(name = EXTERNAL_SORT_BYTES_THRESHOLD, - checker = "checkExternalSortBytesThreshold", fuzzy = true) + checker = "checkExternalSortBytesThreshold", varType = VariableAnnotation.DEPRECATED) public long externalSortBytesThreshold = 0; // Set to 0 to disable; min: 128M @@ -1747,7 +1747,7 @@ public class SessionVariable implements Serializable, Writable { // The memory limit of streaming agg when spilling is enabled // NOTE: streaming agg operator will not spill to disk. - @VariableMgr.VarAttr(name = SPILL_STREAMING_AGG_MEM_LIMIT) + @VariableMgr.VarAttr(name = SPILL_STREAMING_AGG_MEM_LIMIT, fuzzy = true) public long spillStreamingAggMemLimit = 268435456; //256MB public static final int MIN_EXTERNAL_AGG_PARTITION_BITS = 4; @@ -1887,6 +1887,38 @@ public class SessionVariable implements Serializable, Writable { // set random 1, 10, 100, 1000, 10000 this.topnOptLimitThreshold = (int) Math.pow(10, random.nextInt(5)); + + // for spill to disk + /* + if (Config.pull_request_id > 10000) { + if (Config.pull_request_id % 2 == 1) { + this.enablePipelineXEngine = true; + this.enableJoinSpill = true; + this.enableSortSpill = true; + this.enableAggSpill = true; + + randomInt = random.nextInt(4); + switch (randomInt) { + case 0: + this.minRevocableMem = 0; + break; + case 1: + this.minRevocableMem = 1; + break; + case 2: + this.minRevocableMem = 1024 * 1024; + break; + default: + this.minRevocableMem = 100 * 1024 * 1024 * 1024; + break; + } + } else { + this.enableJoinSpill = false; + this.enableSortSpill = false; + this.enableAggSpill = false; + } + } + */ } public String printFuzzyVariables() { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
