This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 8e7aa3dd3a9 f
8e7aa3dd3a9 is described below
commit 8e7aa3dd3a9f86949ddca820a72bb102f93c78e4
Author: yiguolei <[email protected]>
AuthorDate: Fri Sep 27 19:44:41 2024 +0800
f
---
be/src/runtime/memory/memory_reclamation.cpp | 2 +-
be/src/runtime/memory/thread_mem_tracker_mgr.h | 2 +-
be/src/runtime/query_context.cpp | 2 +-
be/src/runtime/query_context.h | 7 ++
.../workload_group/workload_group_manager.cpp | 118 ++++++++++++++++-----
.../workload_group/workload_group_manager.h | 2 +-
6 files changed, 104 insertions(+), 29 deletions(-)
diff --git a/be/src/runtime/memory/memory_reclamation.cpp
b/be/src/runtime/memory/memory_reclamation.cpp
index 17f5a41f462..4a4f73ee098 100644
--- a/be/src/runtime/memory/memory_reclamation.cpp
+++ b/be/src/runtime/memory/memory_reclamation.cpp
@@ -213,7 +213,7 @@ int64_t
MemoryReclamation::tg_enable_overcommit_group_gc(int64_t request_free_me
int64_t total_free_memory = 0;
bool gc_all_exceeded = request_free_memory >= total_exceeded_memory;
std::string log_prefix = fmt::format(
- "work load group that enable overcommit, number of group: {},
request_free_memory:{}, "
+ "workload group that enable overcommit, number of group: {},
request_free_memory:{}, "
"total_exceeded_memory:{}",
task_groups.size(), request_free_memory, total_exceeded_memory);
if (gc_all_exceeded) {
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index c9f85258d5b..760fb992b3b 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -290,7 +290,7 @@ inline doris::Status
ThreadMemTrackerMgr::try_reserve(int64_t size) {
// wg mgr will change wg's hard limit property.
if (wg_ptr != nullptr && wg_ptr->enable_memory_overcommit() &&
!wg_ptr->has_changed_to_hard_limit()) {
- // TODO: Only do a check here, do not real reserve. If we could
reserve it, it is better, but the logic is too complicated.
+ // Only do a check here, do not real reserve. If we could reserve it,
it is better, but the logic is too complicated.
if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) {
return doris::Status::Error<ErrorCode::PROCESS_MEMORY_EXCEEDED>(
"reserve memory failed, size: {}, because {}", size,
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index af4cd3d417d..abc31d967bd 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -472,7 +472,7 @@ Status QueryContext::revoke_memory() {
// Do not use memlimit, use current memory usage.
// For example, if current limit is 1.6G, but current used is 1G, if
reserve failed
// should free 200MB memory, not 300MB
- const auto target_revoking_size = query_mem_tracker->consumption() * 0.2;
+ const int64_t target_revoking_size =
(int64_t)(query_mem_tracker->consumption() * 0.2);
size_t revoked_size = 0;
std::vector<pipeline::PipelineTask*> chosen_tasks;
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 299e4ced55c..6ce09b7dd0e 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -247,6 +247,12 @@ public:
int64_t get_mem_limit() const { return query_mem_tracker->limit(); }
+ void set_expected_mem_limit(int64_t new_mem_limit) {
+ _expected_mem_limit = std::min<int64_t>(new_mem_limit,
_user_set_mem_limit);
+ }
+
+ int64_t expected_mem_limit() { return _expected_mem_limit; }
+
std::shared_ptr<MemTrackerLimiter>& get_mem_tracker() { return
query_mem_tracker; }
int32_t get_slot_count() const {
@@ -411,6 +417,7 @@ private:
std::atomic<bool> _low_memory_mode = false;
int64_t _user_set_mem_limit = 0;
+ std::atomic<int64_t> _expected_mem_limit = 0;
std::mutex _profile_mutex;
timespec _query_arrival_timestamp;
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 287a6b45729..bee4a194a6c 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -212,7 +212,7 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
weighted_memory_limit_ratio);
LOG_EVERY_T(INFO, 60) << debug_msg;
for (auto& wg : _workload_groups) {
- change_query_to_hard_limit(wg.second, false);
+ update_queries_limit(wg.second, false);
}
}
@@ -280,6 +280,7 @@ void WorkloadGroupMgr::handle_paused_queries() {
void WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
const int64_t TIMEOUT_IN_QUEUE = 1000L * 10;
std::unique_lock<std::mutex> lock(_paused_queries_lock);
+ std::vector<std::weak_ptr<QueryContext>> resume_after_gc;
for (auto it = _paused_queries_list.begin(); it !=
_paused_queries_list.end();) {
auto& queries_list = it->second;
const auto& wg = it->first;
@@ -310,9 +311,9 @@ void
WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
query_it = queries_list.erase(query_it);
continue;
}
-
+ bool wg_changed_to_hard_limit = wg->has_changed_to_hard_limit();
// Only deal with non overcommit workload group.
- if (wg->enable_memory_overcommit() &&
!wg->has_changed_to_hard_limit() &&
+ if (wg->enable_memory_overcommit() && !wg_changed_to_hard_limit &&
!query_ctx->paused_reason().is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) {
// Soft limit wg will only reserve failed when process limit
exceed. But in some corner case,
// when reserve, the wg is hard limit, the query reserve
failed, but when this loop run
@@ -328,8 +329,10 @@ void
WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
}
if
(query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
- bool spill_res = handle_single_query(query_ctx,
query_it->reserve_size_,
-
query_ctx->paused_reason());
+ CHECK(!wg->enable_memory_overcommit() ||
wg_changed_to_hard_limit);
+ // Streamload, kafka load, group commit will never have query
memory exceeded error because
+ // their query limit is very large.
+ bool spill_res = handle_single_query(query_ctx,
query_it->reserve_size_,query_ctx->paused_reason();
if (!spill_res) {
++query_it;
continue;
@@ -338,8 +341,23 @@ void
WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
continue;
}
} else if
(query_ctx->paused_reason().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) {
+ CHECK(!wg->enable_memory_overcommit() ||
wg_changed_to_hard_limit);
+ // check if the reserve is too large, if it is too large,
+ // should set the query's limit only.
+ // Check the query's reserve with expected limit.
+ if (query_ctx->expected_mem_limit() <
+ query_ctx->get_mem_tracker()->consumption() +
query_it->reserve_size_) {
+ query_ctx->set_mem_limit(query_ctx->expected_mem_limit());
+ query_ctx->set_memory_sufficient(true);
+ LOG(INFO) << "workload group memory reserve failed because
"
+ << query_ctx->debug_string() << " reserve size "
+ << query_it->reserve_size_ << " is too large,
set hard limit to "
+ << query_ctx->expected_mem_limit() << " and
resume running.";
+ query_it = queries_list.erase(query_it);
+ continue;
+ }
if (!has_changed_hard_limit) {
- change_query_to_hard_limit(wg, true);
+ update_queries_limit(wg, true);
has_changed_hard_limit = true;
LOG(INFO) << "query: " << print_id(query_ctx->query_id())
<< " reserve memory failed due to workload group
memory exceed, "
@@ -412,15 +430,34 @@ void
WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
"to 0 now";
}
if (query_it->cache_ratio_ < 0.001) {
- // TODO: Find other exceed limit workload group and cancel
query.
- bool spill_res = handle_single_query(query_ctx,
query_it->reserve_size_,
-
query_ctx->paused_reason());
- if (!spill_res) {
- ++query_it;
- continue;
+ if (query_it->any_wg_exceed_limit_) {
+ if (wg->enable_memory_overcommit()) {
+ if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE) {
+ resume_after_gc.push_back(query_ctx);
+ query_it = queries_list.erase(query_it);
+ continue;
+ } else {
+ ++query_it;
+ continue;
+ }
+ } else {
+ // current workload group is hard limit, should
not wait other wg with
+ // soft limit, just cancel
+ resume_after_gc.push_back(query_ctx);
+ query_it = queries_list.erase(query_it);
+ continue;
+ }
} else {
- query_it = queries_list.erase(query_it);
- continue;
+ // TODO: Find other exceed limit workload group and
cancel query.
+ bool spill_res = handle_single_query(query_ctx,
query_it->reserve_size_,
+
query_ctx->paused_reason());
+ if (!spill_res) {
+ ++query_it;
+ continue;
+ } else {
+ query_it = queries_list.erase(query_it);
+ continue;
+ }
}
}
if
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted <
@@ -439,8 +476,22 @@ void
WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
// Finished deal with one workload group, and should deal with next
one.
++it;
}
+ // TODO minor GC to release some query
+ if (!resume_after_gc.empty()) {
+ }
+ for (auto resume_it = resume_after_gc.begin(); resume_it !=
resume_after_gc.end();
+ ++resume_it) {
+ auto query_ctx = resume_it->lock();
+ if (query_ctx != nullptr) {
+ query_ctx->set_memory_sufficient(true);
+ }
+ }
}
+// streamload, kafka routine load, group commit
+// insert into select
+// select
+
void WorkloadGroupMgr::handle_overcommit_wg_paused_queries() {
std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
// If there is only one workload group and it is overcommit, then do
nothing.
@@ -448,19 +499,24 @@ void
WorkloadGroupMgr::handle_overcommit_wg_paused_queries() {
if (_workload_groups.size() == 1) {
return;
}
- if (doris::GlobalMemoryArbitrator::is_exceed_soft_mem_limit(100 * 1024 *
1024)) {
+ // soft_limit - 10%, will change workload group to hard limit.
+ // soft limit, process memory reserve failed.
+ // hard limit, FullGC will kill query randomly.
+ if (doris::GlobalMemoryArbitrator::is_exceed_soft_mem_limit(
+ (int64_t)(MemInfo::mem_limit() * 0.1))) {
for (auto& [wg_id, wg] : _workload_groups) {
if (wg->enable_memory_overcommit() &&
!wg->has_changed_to_hard_limit()) {
wg->change_to_hard_limit(true);
- LOG(INFO) << "Process memory usage will exceed soft limit,
change all workload "
+ LOG(INFO) << "Process memory usage + 10% will exceed soft
limit, change all "
+ "workload "
"group with overcommit to hard limit now. "
<< wg->debug_string();
}
}
}
- // If current memory usage is below soft memlimit - 10%, then enable wg's
overcommit
+ // If current memory usage is below soft memlimit - 15%, then enable wg's
overcommit
if (!doris::GlobalMemoryArbitrator::is_exceed_soft_mem_limit(
- (int64_t)(MemInfo::mem_limit() * 0.1))) {
+ (int64_t)(MemInfo::mem_limit() * 0.15))) {
for (auto& [wg_id, wg] : _workload_groups) {
if (wg->enable_memory_overcommit() &&
wg->has_changed_to_hard_limit()) {
wg->change_to_hard_limit(false);
@@ -471,6 +527,7 @@ void
WorkloadGroupMgr::handle_overcommit_wg_paused_queries() {
}
}
}
+
// If the query could release some memory, for example, spill disk, flush
memtable then the return value is true.
// If the query could not release memory, then cancel the query, the return
value is true.
// If the query is not ready to do these tasks, it means just wait.
@@ -488,6 +545,14 @@ bool
WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext> query_c
return false;
}
+ // During waiting all task not running, may release some memory and the
memory is enough now
+ // should resume the query.
+ if (query_ctx->get_mem_tracker()->limit() >
+ query_ctx->get_mem_tracker()->consumption() + size_to_reserve) {
+ query_ctx->set_memory_sufficient(true);
+ return true;
+ }
+
auto revocable_tasks = query_ctx->get_revocable_tasks();
if (revocable_tasks.empty()) {
if (paused_reason.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
@@ -518,7 +583,7 @@ bool
WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext> query_c
return true;
}
-void WorkloadGroupMgr::change_query_to_hard_limit(WorkloadGroupPtr wg, bool
enable_hard_limit) {
+void WorkloadGroupMgr::update_queries_limit(WorkloadGroupPtr wg, bool
enable_hard_limit) {
auto wg_mem_limit = wg->memory_limit();
auto wg_weighted_mem_limit = int64_t(wg_mem_limit * 1);
wg->set_weighted_memory_limit(wg_weighted_mem_limit);
@@ -578,6 +643,7 @@ void
WorkloadGroupMgr::change_query_to_hard_limit(WorkloadGroupPtr wg, bool enab
continue;
}
int64_t query_weighted_mem_limit = 0;
+ int64_t expected_query_weighted_mem_limit = 0;
// If the query enable hard limit, then it should not use the soft
limit
if (query_ctx->enable_query_slot_hard_limit()) {
if (total_slot_count < 1) {
@@ -589,20 +655,21 @@ void
WorkloadGroupMgr::change_query_to_hard_limit(WorkloadGroupPtr wg, bool enab
query_weighted_mem_limit =
(int64_t)((wg_high_water_mark_except_load *
query_ctx->get_slot_count() * 1.0) /
total_slot_count);
+ expected_query_weighted_mem_limit = query_weighted_mem_limit;
}
} else {
// If low water mark is not reached, then use process memory limit
as query memory limit.
// It means it will not take effect.
// If there are some query in paused list, then limit should take
effect.
+ expected_query_weighted_mem_limit =
+ total_used_slot_count > 0
+ ? (int64_t)((wg_high_water_mark_except_load +
total_used_slot_count) *
+ query_ctx->get_slot_count() * 1.0 /
total_used_slot_count)
+ : wg_high_water_mark_except_load;
if (!is_low_wartermark && !enable_hard_limit) {
query_weighted_mem_limit = wg_high_water_mark_except_load;
} else {
- query_weighted_mem_limit = total_used_slot_count > 0
- ?
(int64_t)((wg_high_water_mark_except_load +
-
total_used_slot_count) *
-
query_ctx->get_slot_count() * 1.0 /
-
total_used_slot_count)
- :
wg_high_water_mark_except_load;
+ query_weighted_mem_limit = expected_query_weighted_mem_limit;
}
}
debug_msg += query_ctx->debug_string() + "\n";
@@ -610,6 +677,7 @@ void
WorkloadGroupMgr::change_query_to_hard_limit(WorkloadGroupPtr wg, bool enab
// memory failed and we did not hanle it.
if (!query_ctx->is_pure_load_task()) {
query_ctx->set_mem_limit(query_weighted_mem_limit);
+
query_ctx->set_expected_mem_limit(expected_query_weighted_mem_limit);
}
}
//LOG(INFO) << debug_msg;
diff --git a/be/src/runtime/workload_group/workload_group_manager.h
b/be/src/runtime/workload_group/workload_group_manager.h
index 03f134006f5..d8b49fd827a 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -110,7 +110,7 @@ private:
Status paused_reason);
void handle_non_overcommit_wg_paused_queries();
void handle_overcommit_wg_paused_queries();
- void change_query_to_hard_limit(WorkloadGroupPtr wg, bool
enable_hard_limit);
+ void update_queries_limit(WorkloadGroupPtr wg, bool enable_hard_limit);
private:
std::shared_mutex _group_mutex;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]