This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3d62ed343c0 [improvement](spill) improve spill directory and fix bugs
(#33900)
3d62ed343c0 is described below
commit 3d62ed343c0c5fd08e3224546ce07f3859539b2e
Author: TengJianPing <[email protected]>
AuthorDate: Mon Apr 22 10:10:48 2024 +0800
[improvement](spill) improve spill directory and fix bugs (#33900)
* [improvement](spill) improve spill directory and fix bugs
* fix
---
be/src/common/config.cpp | 2 +-
.../exec/partitioned_hash_join_probe_operator.cpp | 10 ++++---
.../exec/partitioned_hash_join_sink_operator.cpp | 12 +++++++-
be/src/pipeline/exec/spill_sort_sink_operator.cpp | 4 ---
be/src/pipeline/exec/spill_sort_sink_operator.h | 1 -
be/src/runtime/query_context.cpp | 3 ++
.../workload_group/workload_group_manager.cpp | 2 +-
be/src/service/doris_main.cpp | 3 ++
be/src/vec/spill/spill_stream.cpp | 5 ++++
be/src/vec/spill/spill_stream.h | 2 ++
be/src/vec/spill/spill_stream_manager.cpp | 33 ++++++++++++++++++----
be/src/vec/spill/spill_stream_manager.h | 2 ++
12 files changed, 62 insertions(+), 17 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index cebc51e2c74..6431a03beb2 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1163,7 +1163,7 @@ DEFINE_mDouble(high_disk_avail_level_diff_usages, "0.15");
// create tablet in partition random robin idx lru size, default 10000
DEFINE_Int32(partition_disk_index_lru_size, "10000");
// limit the storage space that query spill files can use
-DEFINE_String(spill_storage_root_path, "${DORIS_HOME}/storage");
+DEFINE_String(spill_storage_root_path, "");
DEFINE_String(spill_storage_limit, "20%"); // 20%
DEFINE_mInt32(spill_gc_interval_ms, "2000"); // 2s
DEFINE_mInt32(spill_gc_file_count, "2000");
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 0f57a03fc64..03ca1299b43 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -251,7 +251,6 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
auto execution_context_lock = execution_context.lock();
if (!execution_context_lock) {
LOG(INFO) << "execution_context released, maybe query
was cancelled.";
- _dependency->set_ready();
return;
}
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
@@ -325,7 +324,11 @@ Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
}
auto& mutable_block =
_shared_state->partitioned_build_blocks[partition_index];
- DCHECK(mutable_block != nullptr);
+ if (!mutable_block) {
+
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
+ spilled_stream.reset();
+ return Status::OK();
+ }
auto execution_context = state->get_task_execution_context();
_shared_state_holder = _shared_state->shared_from_this();
@@ -340,11 +343,11 @@ Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
LOG(INFO) << "execution_context released, maybe query was
cancelled.";
return;
}
+ SCOPED_ATTACH_TASK(state);
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
SCOPED_TIMER(_recovery_build_timer);
Defer defer([this] { --_spilling_task_count; });
(void)state; // avoid ut compile error
- SCOPED_ATTACH_TASK(state);
DCHECK_EQ(_spill_status_ok.load(), true);
bool eos = false;
@@ -654,7 +657,6 @@ Status
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
if (local_state._need_to_setup_internal_operators) {
*eos = false;
bool has_data = false;
- CHECK_EQ(local_state._dependency->is_blocked_by(), nullptr);
RETURN_IF_ERROR(local_state.recovery_build_blocks_from_disk(
state, local_state._partition_cursor, has_data));
if (has_data) {
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index d0ca832630e..416d678b580 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -137,7 +137,17 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
auto execution_context = state->get_task_execution_context();
_dependency->block();
- auto spill_func = [execution_context, build_block, state, this]() {
+ auto query_id = state->query_id();
+ auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
+ auto spill_func = [execution_context, build_block, state, query_id,
mem_tracker,
+ this]() mutable {
+ SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+ Defer defer {[&]() {
+ // need to reset build_block here, or else build_block will be
destructed
+ // after SCOPED_ATTACH_TASK_WITH_ID and will trigger
memory_orphan_check failure
+ build_block.reset();
+ }};
+
auto execution_context_lock = execution_context.lock();
if (!execution_context_lock) {
LOG(INFO) << "execution_context released, maybe query was
cancelled.";
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index fd63df92976..48b1670ca1f 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -137,10 +137,6 @@ Status SpillSortSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<LocalStateType>::open(state));
return _sort_sink_operator->open(state);
}
-Status SpillSortSinkOperatorX::close(RuntimeState* state) {
- RETURN_IF_ERROR(DataSinkOperatorX<LocalStateType>::close(state));
- return _sort_sink_operator->close(state);
-}
Status SpillSortSinkOperatorX::revoke_memory(RuntimeState* state) {
if (!_enable_spill) {
return Status::OK();
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h
b/be/src/pipeline/exec/spill_sort_sink_operator.h
index 4604696eff2..d552d67570a 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.h
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.h
@@ -79,7 +79,6 @@ public:
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
- Status close(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
DataDistribution required_data_distribution() const override {
return _sort_sink_operator->required_data_distribution();
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index d9828080ddc..081d8ca1f59 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -42,6 +42,7 @@
#include "runtime/workload_group/workload_group_manager.h"
#include "util/mem_info.h"
#include "util/uid_util.h"
+#include "vec/spill/spill_stream_manager.h"
namespace doris {
@@ -167,6 +168,8 @@ QueryContext::~QueryContext() {
file_scan_range_params_map.clear();
obj_pool.clear();
+ _exec_env->spill_stream_mgr()->async_cleanup_query(_query_id);
+
LOG_INFO("Query {} deconstructed, {}", print_id(this->_query_id),
mem_tracker_msg);
}
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 2e75218e9cf..e336c9f80a8 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -183,7 +183,7 @@ void WorkloadGroupMgr::refresh_wg_memory_info() {
// in process_mem_used.
// we count these cache memories equally on workload groups.
double ratio = (double)proc_vm_rss / (double)all_queries_mem_used;
- if (ratio >= 1.25) {
+ if (ratio <= 1.25) {
auto sys_mem_available = doris::MemInfo::sys_mem_available();
std::string debug_msg = fmt::format(
"\nProcess Memory Summary: process_vm_rss: {}, process mem:
{}, sys mem available: "
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index a218c7be6f1..731e09c6be9 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -411,6 +411,9 @@ int main(int argc, char** argv) {
}
std::vector<doris::StorePath> spill_paths;
+ if (doris::config::spill_storage_root_path.empty()) {
+ doris::config::spill_storage_root_path =
doris::config::storage_root_path;
+ }
olap_res =
doris::parse_conf_store_paths(doris::config::spill_storage_root_path,
&spill_paths);
if (!olap_res) {
LOG(ERROR) << "parse config spill storage path failed, path="
diff --git a/be/src/vec/spill/spill_stream.cpp
b/be/src/vec/spill/spill_stream.cpp
index f5b6fea096d..ed7be9a0b28 100644
--- a/be/src/vec/spill/spill_stream.cpp
+++ b/be/src/vec/spill/spill_stream.cpp
@@ -25,6 +25,7 @@
#include "io/fs/local_file_system.h"
#include "runtime/exec_env.h"
+#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "vec/core/block.h"
#include "vec/spill/spill_reader.h"
@@ -88,6 +89,10 @@ void SpillStream::close() {
}
}
+const TUniqueId& SpillStream::query_id() const {
+ return state_->query_id();
+}
+
const std::string& SpillStream::get_spill_root_dir() const {
return data_dir_->path();
}
diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h
index 6b5166f2652..68abfa9aaf7 100644
--- a/be/src/vec/spill/spill_stream.h
+++ b/be/src/vec/spill/spill_stream.h
@@ -81,6 +81,8 @@ public:
read_wait_io_timer_ = wait_io_timer;
}
+ const TUniqueId& query_id() const;
+
private:
friend class SpillStreamManager;
diff --git a/be/src/vec/spill/spill_stream_manager.cpp
b/be/src/vec/spill/spill_stream_manager.cpp
index 0259bef33f3..05a2531c466 100644
--- a/be/src/vec/spill/spill_stream_manager.cpp
+++ b/be/src/vec/spill/spill_stream_manager.cpp
@@ -36,6 +36,7 @@
#include "util/pretty_printer.h"
#include "util/runtime_profile.h"
#include "util/time.h"
+#include "util/uid_util.h"
#include "vec/spill/spill_stream.h"
namespace doris::vectorized {
@@ -128,7 +129,7 @@ Status SpillStreamManager::_init_spill_store_map() {
std::vector<SpillDataDir*> SpillStreamManager::_get_stores_for_spill(
TStorageMedium::type storage_medium) {
std::vector<SpillDataDir*> stores;
- for (auto&& [_, store] : _spill_store_map) {
+ for (auto& [_, store] : _spill_store_map) {
if (store->storage_medium() == storage_medium &&
!store->reach_capacity_limit(0)) {
stores.push_back(store.get());
}
@@ -188,7 +189,7 @@ Status
SpillStreamManager::register_spill_stream(RuntimeState* state, SpillStrea
for (auto& dir : data_dirs) {
data_dir = dir;
std::string spill_root_dir = fmt::format("{}/{}", data_dir->path(),
SPILL_DIR_PREFIX);
- spill_dir = fmt::format("{}/{}-{}-{}-{}-{}", spill_root_dir, query_id,
operator_name,
+ spill_dir = fmt::format("{}/{}/{}-{}-{}-{}", spill_root_dir, query_id,
operator_name,
node_id, state->task_id(), id);
auto st = io::global_local_filesystem()->create_directory(spill_dir);
if (!st.ok()) {
@@ -207,9 +208,15 @@ Status
SpillStreamManager::register_spill_stream(RuntimeState* state, SpillStrea
}
void SpillStreamManager::delete_spill_stream(SpillStreamSPtr stream) {
- auto gc_dir = fmt::format("{}/{}/{}", stream->get_data_dir()->path(),
SPILL_GC_DIR_PREFIX,
-
std::filesystem::path(stream->get_spill_dir()).filename().string());
- (void)io::global_local_filesystem()->rename(stream->get_spill_dir(),
gc_dir);
+ auto query_dir = fmt::format("{}/{}/{}", stream->get_data_dir()->path(),
SPILL_GC_DIR_PREFIX,
+ print_id(stream->query_id()));
+ auto st = io::global_local_filesystem()->create_directory(query_dir);
+ if (st.ok()) {
+ auto gc_dir =
+ fmt::format("{}/{}", query_dir,
+
std::filesystem::path(stream->get_spill_dir()).filename().string());
+ (void)io::global_local_filesystem()->rename(stream->get_spill_dir(),
gc_dir);
+ }
}
void SpillStreamManager::gc(int64_t max_file_count) {
@@ -266,6 +273,22 @@ void SpillStreamManager::gc(int64_t max_file_count) {
}
}
+void SpillStreamManager::async_cleanup_query(TUniqueId query_id) {
+ (void)get_async_task_thread_pool()->submit_func([this, query_id] {
+ for (auto& [_, store] : _spill_store_map) {
+ std::string query_spill_dir =
+ fmt::format("{}/{}/{}", store->path(), SPILL_DIR_PREFIX,
print_id(query_id));
+ bool exists = false;
+ auto status =
io::global_local_filesystem()->exists(query_spill_dir, &exists);
+ if (status.ok() && exists) {
+ auto gc_dir = fmt::format("{}/{}/{}-gc", store->path(),
SPILL_GC_DIR_PREFIX,
+ print_id(query_id));
+ (void)io::global_local_filesystem()->rename(query_spill_dir,
gc_dir);
+ }
+ }
+ });
+}
+
SpillDataDir::SpillDataDir(std::string path, int64_t capacity_bytes,
TStorageMedium::type storage_medium)
: _path(std::move(path)),
diff --git a/be/src/vec/spill/spill_stream_manager.h
b/be/src/vec/spill/spill_stream_manager.h
index 2d7350f775f..36062ce0b46 100644
--- a/be/src/vec/spill/spill_stream_manager.h
+++ b/be/src/vec/spill/spill_stream_manager.h
@@ -108,6 +108,8 @@ public:
// 标记SpillStream需要被删除,在GC线程中异步删除落盘文件
void delete_spill_stream(SpillStreamSPtr spill_stream);
+ void async_cleanup_query(TUniqueId query_id);
+
void gc(int64_t max_file_count);
ThreadPool* get_spill_io_thread_pool(const std::string& path) const {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]