This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new ae974c9435b [refactor] refactor spill strategy (#41195)
ae974c9435b is described below
commit ae974c9435b7f73c1afd5dce81958e0c0b65a5d2
Author: yiguolei <[email protected]>
AuthorDate: Tue Sep 24 11:57:13 2024 +0800
[refactor] refactor spill strategy (#41195)
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
Co-authored-by: yiguolei <[email protected]>
---
be/src/common/status.h | 3 +
be/src/pipeline/pipeline_task.cpp | 17 +-
be/src/runtime/memory/thread_mem_tracker_mgr.h | 6 +-
be/src/runtime/query_context.cpp | 4 +
be/src/runtime/query_context.h | 24 ++
be/src/runtime/workload_group/workload_group.h | 5 +
.../workload_group/workload_group_manager.cpp | 312 +++++++++------------
.../workload_group/workload_group_manager.h | 6 +-
8 files changed, 184 insertions(+), 193 deletions(-)
diff --git a/be/src/common/status.h b/be/src/common/status.h
index e95b9343167..c6e2712dc9c 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -130,6 +130,9 @@ namespace ErrorCode {
E(BAD_CAST, -254, true); \
E(ARITHMETIC_OVERFLOW_ERRROR, -255, false); \
E(PERMISSION_DENIED, -256, false); \
+ E(QUERY_MEMORY_EXCEED, -257, false); \
+ E(WORKLOAD_GROUP_MEMORY_EXCEED, -257, false); \
+ E(PROCESS_MEMORY_EXCEED, -257, false); \
E(CE_CMD_PARAMS_ERROR, -300, true); \
E(CE_BUFFER_TOO_SMALL, -301, true); \
E(CE_CMD_NOT_VALID, -302, true); \
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index f7e8da9f948..28ca578757d 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -386,12 +386,14 @@ Status PipelineTask::execute(bool* eos) {
if (workload_group && reserve_size > 0) {
auto st = thread_context()->try_reserve_memory(reserve_size);
if (!st.ok()) {
- VLOG_DEBUG << "query: " << print_id(query_id)
- << ", try to reserve: " << reserve_size <<
"(sink reserve size:("
- << sink_reserve_size << " )"
- << ", sink name: " << _sink->get_name()
- << ", node id: " << _sink->node_id() << "
failed: " << st.to_string()
- << ", debug info: " <<
GlobalMemoryArbitrator::process_mem_log_str();
+ LOG(INFO) << "query: " << print_id(query_id)
+ << ", try to reserve: " << reserve_size <<
"(sink reserve size:("
+ << sink_reserve_size << " )"
+ << ", sink name: " << _sink->get_name()
+ << ", node id: " << _sink->node_id() << "
failed: " << st.to_string()
+ << ", debug info: " <<
GlobalMemoryArbitrator::process_mem_log_str();
+
+ _state->get_query_ctx()->update_paused_reason(st);
_state->get_query_ctx()->set_low_memory_mode();
bool is_high_wartermark = false;
bool is_low_wartermark = false;
@@ -401,7 +403,7 @@ Status PipelineTask::execute(bool* eos) {
/// If the available memory for revoking is large enough,
here trigger revoking proactively.
bool need_to_pause = false;
const auto revocable_mem_size =
_sink->revocable_mem_size(_state);
- if (revocable_mem_size > 1024L * 1024 * 1024) {
+ if (revocable_mem_size > 100L * 1024 * 1024) {
LOG(INFO) << "query: " << print_id(query_id)
<< ", task id: " << _state->task_id()
<< " has big memory to revoke: " <<
revocable_mem_size;
@@ -410,7 +412,6 @@ Status PipelineTask::execute(bool* eos) {
} else {
need_to_pause = is_low_wartermark ||
is_high_wartermark;
}
-
if (need_to_pause) {
_memory_sufficient_dependency->block();
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index fd14750d8b8..01df735fff1 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -290,7 +290,7 @@ inline doris::Status
ThreadMemTrackerMgr::try_reserve(int64_t size) {
"reserve memory failed, size: {}, because memory tracker
consumption: {}, limit: "
"{}",
size, _limiter_tracker->consumption(),
_limiter_tracker->limit());
- return doris::Status::MemoryLimitExceeded(err_msg);
+ return doris::Status::Error<ErrorCode::QUERY_MEMORY_EXCEED>(err_msg);
}
auto wg_ptr = _wg_wptr.lock();
if (wg_ptr) {
@@ -299,7 +299,7 @@ inline doris::Status
ThreadMemTrackerMgr::try_reserve(int64_t size) {
wg_ptr->memory_debug_string());
_limiter_tracker->release(size); // rollback
_limiter_tracker->release_reserved(size); // rollback
- return doris::Status::MemoryLimitExceeded(err_msg);
+ return
doris::Status::Error<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEED>(err_msg);
}
}
if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) {
@@ -310,7 +310,7 @@ inline doris::Status
ThreadMemTrackerMgr::try_reserve(int64_t size) {
if (wg_ptr) {
wg_ptr->sub_wg_refresh_interval_memory_growth(size); // rollback
}
- return doris::Status::MemoryLimitExceeded(err_msg);
+ return doris::Status::Error<ErrorCode::PROCESS_MEMORY_EXCEED>(err_msg);
}
_reserved_mem += size;
return doris::Status::OK();
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 47d750978be..2932bb708ec 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -224,6 +224,10 @@ void QueryContext::set_execution_dependency_ready() {
void QueryContext::set_memory_sufficient(bool sufficient) {
if (sufficient) {
+ {
+ std::lock_guard l(_paused_mutex);
+ _paused_reason = Status::OK();
+ }
_memory_sufficient_dependency->set_ready();
} else {
_memory_sufficient_dependency->block();
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 030de735245..471f82ba807 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -321,6 +321,27 @@ public:
bool low_memory_mode() { return _low_memory_mode; }
+ void update_paused_reason(const Status& st) {
+ std::lock_guard l(_paused_mutex);
+ if (_paused_reason.is<ErrorCode::QUERY_MEMORY_EXCEED>()) {
+ return;
+ } else if
(_paused_reason.is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEED>()) {
+ if (st.is<ErrorCode::QUERY_MEMORY_EXCEED>()) {
+ _paused_reason = st;
+ return;
+ } else {
+ return;
+ }
+ } else {
+ _paused_reason = st;
+ }
+ }
+
+ Status paused_reason() {
+ std::lock_guard l(_paused_mutex);
+ return _paused_reason;
+ }
+
private:
int _timeout_second;
TUniqueId _query_id;
@@ -377,6 +398,9 @@ private:
// help us manage the query.
QuerySource _query_source;
+ Status _paused_reason;
+ std::mutex _paused_mutex;
+
// when fragment of pipeline is closed, it will register its profile to
this map by using add_fragment_profile
// flatten profile of one fragment:
// Pipeline 0
diff --git a/be/src/runtime/workload_group/workload_group.h
b/be/src/runtime/workload_group/workload_group.h
index b00bbfe68b5..86e5f494f77 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -235,6 +235,10 @@ public:
int64_t load_buffer_limit() { return _load_buffer_limit; }
+ void update_memory_sufficent(bool is_sufficient) { _is_sufficient =
is_sufficient; }
+
+ bool memory_sufficent() const { return _is_sufficient; }
+
private:
mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share,
_memory_limit
const uint64_t _id;
@@ -245,6 +249,7 @@ private:
// If the wg's memory reached high water mark, then the load buffer
// will be restricted to this limit.
int64_t _load_buffer_limit;
+ std::atomic<bool> _is_sufficient = true;
// memory used by load memtable
int64_t _active_mem_usage = 0;
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 39e20db4fca..b9a7989d59e 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -34,8 +34,10 @@
namespace doris {
-PausedQuery::PausedQuery(std::shared_ptr<QueryContext> query_ctx)
- : query_ctx_(query_ctx), query_id_(print_id(query_ctx->query_id())) {
+PausedQuery::PausedQuery(std::shared_ptr<QueryContext> query_ctx, double
cache_ratio)
+ : query_ctx_(query_ctx),
+ cache_ratio_(cache_ratio),
+ query_id_(print_id(query_ctx->query_id())) {
enqueue_at = std::chrono::system_clock::now();
}
@@ -196,7 +198,8 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
doris::GlobalMemoryArbitrator::sys_mem_available_details_str(),
PrettyPrinter::print(all_workload_groups_mem_usage, TUnit::BYTES),
weighted_memory_limit_ratio);
- LOG_EVERY_T(INFO, 60) << debug_msg;
+ // LOG_EVERY_T(INFO, 60) << debug_msg;
+ LOG(INFO) << debug_msg;
for (auto& wg : _workload_groups) {
auto wg_mem_limit = wg.second->memory_limit();
auto wg_weighted_mem_limit = int64_t(wg_mem_limit *
weighted_memory_limit_ratio);
@@ -272,7 +275,8 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
} else {
// If low water mark is not reached, then use process memory
limit as query memory limit.
// It means it will not take effect.
- if (!is_low_wartermark) {
+ // If there are some query in paused list, then limit should
take effect.
+ if (!is_low_wartermark && wg.second->memory_sufficent()) {
query_weighted_mem_limit = wg_high_water_mark_limit;
} else {
query_weighted_mem_limit =
@@ -293,7 +297,10 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
query_ctx->set_mem_limit(query_weighted_mem_limit);
}
- LOG_EVERY_T(INFO, 60) << debug_msg;
+ // During memory insufficent stage, we already set every query's
memlimit, so that the flag is useless any more.
+ wg.second->update_memory_sufficent(true);
+ // LOG_EVERY_T(INFO, 60) << debug_msg;
+ LOG(INFO) << debug_msg;
}
}
@@ -325,37 +332,31 @@ void WorkloadGroupMgr::add_paused_query(const
std::shared_ptr<QueryContext>& que
std::lock_guard<std::mutex> lock(_paused_queries_lock);
DCHECK(query_ctx != nullptr);
auto wg = query_ctx->workload_group();
- auto&& [it, inserted] = _paused_queries_list[wg].emplace(query_ctx);
+ auto&& [it, inserted] = _paused_queries_list[wg].emplace(
+ query_ctx,
doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted);
if (inserted) {
- LOG(INFO) << "here insert one new paused query: " << it->query_id()
- << ", wg: " << (void*)(wg.get());
+ LOG(INFO) << "workload group " << wg->debug_string()
+ << " insert one new paused query: " << it->query_id();
}
}
/**
* Strategy 1: A revocable query should not have any running
task(PipelineTask).
- * strategy 2: If the workload group is below low water mark, we make all
queries in this wg runnable.
- * strategy 3: Pick the query which has the max revocable size to revoke
memory.
- * strategy 4: If all queries are not revocable and they all have not any
running task,
- * we choose the max memory usage query to cancel.
+ * strategy 2: If the workload group has any task exceed workload group
memlimit, then set all queryctx's memlimit
+ * strategy 3: If any query exceed process memlimit, then should clear all
caches.
+ * strategy 4: If any query exceed query's memlimit, then do spill disk or
cancel it.
+ * strategy 5: If any query exceed process's memlimit and cache is zero, then
do spill disk or cancel it.
*/
void WorkloadGroupMgr::handle_paused_queries() {
std::unique_lock<std::mutex> lock(_paused_queries_lock);
- if (_paused_queries_list.empty()) {
- if
(doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted
!= 1) {
-
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted =
1;
- doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity();
- LOG(INFO) << "There are no queries in paused list, so that set
cache capacity to 1 now";
- }
- return;
- }
-
for (auto it = _paused_queries_list.begin(); it !=
_paused_queries_list.end();) {
auto& queries_list = it->second;
const auto& wg = it->first;
if (queries_list.empty()) {
- LOG(INFO) << "wg: " << wg->debug_string() << " has no paused
query";
+ LOG(INFO) << "wg: " << wg->debug_string()
+ << " has no paused query, update it to memory sufficent";
it = _paused_queries_list.erase(it);
+ wg->update_memory_sufficent(true);
continue;
}
@@ -364,184 +365,133 @@ void WorkloadGroupMgr::handle_paused_queries() {
wg->check_mem_used(&is_low_wartermark, &is_high_wartermark);
- if (!is_low_wartermark && !is_high_wartermark) {
- // TODO: should check if there is a large reserve size in the
query's operators
- // If it exist, then should find the query and spill it.
- LOG(INFO) << "**** there are " << queries_list.size() << " to
resume";
- for (const auto& query : queries_list) {
- LOG(INFO) << "**** resume paused query: " << query.query_id();
- auto query_ctx = query.query_ctx_.lock();
- if (query_ctx != nullptr) {
- query_ctx->set_memory_sufficient(true);
- }
- }
-
- queries_list.clear();
- it = _paused_queries_list.erase(it);
- continue;
- } else {
- ++it;
- }
-
- // If the wg's query list is empty, then should do nothing
- if (queries_list.empty()) {
- continue;
- }
-
- // TODO: should check buffer type memory first, if could release many
these memory, then not need do spill disk
- // Buffer Memory are:
- // 1. caches: page cache, segment cache...
- // 2. memtables: load memtable
- // 3. scan queue, exchange sink buffer, union queue
- // 4. streaming aggs.
- // If we could not recycle memory from these buffers(< 10%), then do
spill disk.
-
- // 1. Check cache used, if cache is larger than > 0, then just return
and wait for it to 0 to release some memory.
- if
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted >
0 &&
-
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted >
0) {
-
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted =
0;
- doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity();
- LOG(INFO) << "There are some queries need memory, so that set
cache capacity to 0 now";
- // If there is cache, then return, only check to do spill disk
when cache is larger than 0.
- return;
- }
-
- // 2. If memtable size larger than 10% of wg's limit, then flush
memtable and wait.
- MemTableMemoryLimiter* memtable_limiter =
- doris::ExecEnv::GetInstance()->memtable_memory_limiter();
- // Not use memlimit, should use high water mark.
- int64_t memtable_active_bytes = 0;
- int64_t memtable_write_bytes = 0;
- int64_t memtable_flush_bytes = 0;
- wg->get_load_mem_usage(&memtable_active_bytes, &memtable_write_bytes,
- &memtable_flush_bytes);
- // TODO: should add a signal in memtable limiter to prevent new batch
- // For example, streamload, it will not reserve many memory, but it
will occupy many memtable memory.
- // TODO: 0.2 should be a workload group properties. For example, the
group is optimized for load,then the value
- // should be larged, if the group is optimized for query, then the
value should be smaller.
- int64_t max_wg_memtable_bytes = wg->load_buffer_limit();
- if (memtable_active_bytes + memtable_write_bytes +
memtable_flush_bytes >
- max_wg_memtable_bytes) {
- // There are many table in flush queue, just waiting them flush
finished.
- if (memtable_active_bytes < (int64_t)(max_wg_memtable_bytes *
0.8)) {
- LOG_EVERY_T(INFO, 60)
- << wg->name() << " load memtable size is: " <<
memtable_active_bytes << ", "
- << memtable_write_bytes << ", " << memtable_flush_bytes
- << ", wait for flush finished to release more memory";
- continue;
- } else {
- // Flush some memtables(currently written) to flush queue.
- memtable_limiter->flush_workload_group_memtables(
- wg->id(), memtable_active_bytes -
(int64_t)(max_wg_memtable_bytes * 0.8));
- LOG_EVERY_T(INFO, 60)
- << wg->name() << " load memtable size is: " <<
memtable_active_bytes << ", "
- << memtable_write_bytes << ", " << memtable_flush_bytes
- << ", flush half of active memtable to revoke memory";
- continue;
- }
- }
-
- // If the wg enable memory overcommit, then not spill, just cancel
query.
- // if (wg->enable_memory_overcommit()) {
- // TODO should cancel top query here.
- // continue;
- //}
-
- std::shared_ptr<QueryContext> max_revocable_query;
- std::shared_ptr<QueryContext> max_memory_usage_query;
- std::shared_ptr<QueryContext> running_query;
- bool has_running_query = false;
- size_t max_revocable_size = 0;
- size_t max_memory_usage = 0;
- auto it_to_remove = queries_list.end();
-
+ // If the query is paused because its limit exceed the query itself's
memlimit, then just spill disk.
+ // The query's memlimit is set using slot mechanism and its value is
set using the user settings, not
+ // by weighted value. So if reserve failed, then it is actually exceed
limit.
for (auto query_it = queries_list.begin(); query_it !=
queries_list.end();) {
- const auto query_ctx = query_it->query_ctx_.lock();
+ auto query_ctx = query_it->query_ctx_.lock();
// The query is finished during in paused list.
if (query_ctx == nullptr) {
query_it = queries_list.erase(query_it);
continue;
}
- size_t revocable_size = 0;
- size_t memory_usage = 0;
- bool has_running_task = false;
-
if (query_ctx->is_cancelled()) {
LOG(INFO) << "query: " << print_id(query_ctx->query_id())
<< "was canceled, remove from paused list";
query_it = queries_list.erase(query_it);
continue;
}
-
- query_ctx->get_revocable_info(&revocable_size, &memory_usage,
&has_running_task);
- if (has_running_task) {
- has_running_query = true;
- running_query = query_ctx;
- break;
- } else if (revocable_size > max_revocable_size) {
- max_revocable_query = query_ctx;
- max_revocable_size = revocable_size;
- it_to_remove = query_it;
- } else if (memory_usage > max_memory_usage) {
- max_memory_usage_query = query_ctx;
- max_memory_usage = memory_usage;
- it_to_remove = query_it;
- }
-
- ++query_it;
- }
-
- if (has_running_query) {
- LOG(INFO) << "has running task, query: " <<
print_id(running_query->query_id());
- } else if (max_revocable_query) {
- queries_list.erase(it_to_remove);
- queries_list.insert(queries_list.begin(), max_revocable_query);
-
- auto revocable_tasks = max_revocable_query->get_revocable_tasks();
- DCHECK(!revocable_tasks.empty());
-
- LOG(INFO) << "query: " <<
print_id(max_revocable_query->query_id()) << ", has "
- << revocable_tasks.size()
- << " tasks to revoke memory, max revocable size: " <<
max_revocable_size;
- SCOPED_ATTACH_TASK(max_revocable_query.get());
- for (auto* task : revocable_tasks) {
- auto st = task->revoke_memory();
- if (!st.ok()) {
- max_revocable_query->cancel(st);
- break;
+ if
(query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEED>()) {
+ bool spill_res = spill_or_cancel_query(query_ctx,
query_ctx->paused_reason());
+ if (!spill_res) {
+ ++query_it;
+ continue;
+ } else {
+ query_it = queries_list.erase(query_it);
+ continue;
}
- }
- } else if (max_memory_usage_query) {
- bool new_is_low_wartermark = false;
- bool new_is_high_wartermark = false;
- const auto query_id = print_id(max_memory_usage_query->query_id());
- wg->check_mem_used(&new_is_low_wartermark,
&new_is_high_wartermark);
- if (new_is_high_wartermark) {
- if (it_to_remove->elapsed_time() < 2000) {
- LOG(INFO) << "memory insufficient and cannot find
revocable query, "
- "the max usage query: "
- << query_id << ", usage: " << max_memory_usage
- << ", elapsed: " << it_to_remove->elapsed_time()
- << ", wg info: " << wg->debug_string();
+ } else if
(query_ctx->paused_reason().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEED>()) {
+ if (wg->memory_sufficent()) {
+ wg->update_memory_sufficent(false);
+ LOG(INFO) << "query: " << print_id(query_ctx->query_id())
+ << " reserve memory failed due to workload group
memory exceed, "
+ "should set the workload group work in memory
insufficent mode, "
+ "so that other query will reduce their
memory. wg: "
+ << wg->debug_string();
+ }
+ // Should not put the query back to task scheduler
immediately, because when wg's memory not sufficient,
+ // and then set wg's flag, other query may not free memory
very quickly.
+ if (query_it->elapsed_time() > 1000) {
+ // set wg's memory to insufficent, then add it back to
task scheduler to run.
+ query_ctx->set_memory_sufficient(true);
+ query_it = queries_list.erase(query_it);
+ } else {
+ ++query_it;
+ }
+ continue;
+ } else {
+ // If wg's memlimit not exceed, but process memory exceed, it
means cache or other metadata
+ // used too much memory. Should clean all cache here.
+ // 1. Check cache used, if cache is larger than > 0, then just
return and wait for it to 0 to release some memory.
+ if
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted >
+ 0.001 &&
+
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted >
+ 0.001) {
+
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted =
+ 0;
+
doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity();
+ LOG(INFO) << "There are some queries need process memory,
so that set cache "
+ "capacity "
+ "to 0 now";
+ }
+ if (query_it->cache_ratio_ < 0.001) {
+ bool spill_res = spill_or_cancel_query(query_ctx,
query_ctx->paused_reason());
+ if (!spill_res) {
+ ++query_it;
+ continue;
+ } else {
+ query_it = queries_list.erase(query_it);
+ continue;
+ }
+ }
+ if
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted <
+ 0.001 &&
+ query_it->cache_ratio_ > 0.001) {
+ query_ctx->set_memory_sufficient(true);
+ query_it = queries_list.erase(query_it);
continue;
}
- max_memory_usage_query->cancel(Status::InternalError(
- "memory insufficient and cannot find revocable query,
cancel "
- "the "
- "biggest usage({}) query({})",
- max_memory_usage, query_id));
- queries_list.erase(it_to_remove);
+ ++query_it;
+ }
+ }
- } else {
- LOG(INFO) << "non high water mark, resume "
- "the query: "
- << query_id << ", usage: " << max_memory_usage
- << ", wg info: " << wg->debug_string();
- max_memory_usage_query->set_memory_sufficient(true);
- queries_list.erase(it_to_remove);
+ // Finished deal with one workload group, and should deal with next
one.
+ ++it;
+ }
+}
+
+bool WorkloadGroupMgr::spill_or_cancel_query(std::shared_ptr<QueryContext>
query_ctx,
+ Status paused_reason) {
+ // TODO: If the query is an insert into select query, should consider
memtable as revoke memory.
+ size_t revocable_size = 0;
+ size_t memory_usage = 0;
+ bool has_running_task = false;
+ query_ctx->get_revocable_info(&revocable_size, &memory_usage,
&has_running_task);
+ if (has_running_task) {
+ LOG(INFO) << "query: " << print_id(query_ctx->query_id())
+ << "is paused, but still has running task, skip it.";
+ return false;
+ }
+
+ auto revocable_tasks = query_ctx->get_revocable_tasks();
+ if (revocable_tasks.empty()) {
+ if (paused_reason.is<ErrorCode::QUERY_MEMORY_EXCEED>()) {
+ // Use MEM_LIMIT_EXCEEDED so that FE could parse the error code
and do try logic
+
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(
+ "query reserve memory failed, but could not find memory
that "
+ "could "
+ "release or spill to disk"));
+ } else {
+
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(
+ "The query reserved memory failed because process limit
exceeded, and "
+ "there is no cache now. And could not find task to spill.
Maybe you should set "
+ "the workload group's limit to a lower value."));
+ }
+ } else {
+ SCOPED_ATTACH_TASK(query_ctx.get());
+ // TODO, should spill the task that has max memory, not all
+ for (auto* task : revocable_tasks) {
+ auto st = task->revoke_memory();
+ if (!st.ok()) {
+ query_ctx->cancel(st);
+ break;
}
}
+ LOG(INFO) << "query: " << print_id(query_ctx->query_id()) << ", has "
+ << revocable_tasks.size()
+ << " tasks to revoke memory, revocable size: " <<
revocable_size;
}
+ return true;
}
void WorkloadGroupMgr::stop() {
diff --git a/be/src/runtime/workload_group/workload_group_manager.h
b/be/src/runtime/workload_group/workload_group_manager.h
index 6e51a1f3767..f84bf3a29ff 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -44,8 +44,9 @@ public:
std::weak_ptr<QueryContext> query_ctx_;
std::chrono::system_clock::time_point enqueue_at;
size_t last_mem_usage {0};
+ double cache_ratio_ {0.0};
- PausedQuery(std::shared_ptr<QueryContext> query_ctx);
+ PausedQuery(std::shared_ptr<QueryContext> query_ctx, double cache_ratio);
int64_t elapsed_time() const {
auto now = std::chrono::system_clock::now();
@@ -101,6 +102,9 @@ public:
void update_load_memtable_usage(const std::map<uint64_t, MemtableUsage>&
wg_memtable_usages);
+private:
+ bool spill_or_cancel_query(std::shared_ptr<QueryContext> query_ctx, Status
paused_reason);
+
private:
std::shared_mutex _group_mutex;
std::unordered_map<uint64_t, WorkloadGroupPtr> _workload_groups;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]