This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 5619ce37c84 print more message during revoke (#41365)
5619ce37c84 is described below
commit 5619ce37c84fce00b4cdf8f5f067762da4d55c41
Author: yiguolei <[email protected]>
AuthorDate: Thu Sep 26 19:12:55 2024 +0800
print more message during revoke (#41365)
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
Co-authored-by: yiguolei <[email protected]>
---
be/src/common/status.h | 7 +++++++
be/src/runtime/query_context.cpp | 23 ++++++++++++++++++----
be/src/runtime/query_context.h | 2 ++
.../workload_group/workload_group_manager.cpp | 11 +++--------
4 files changed, 31 insertions(+), 12 deletions(-)
diff --git a/be/src/common/status.h b/be/src/common/status.h
index 64feaf12fc2..a4610d963b8 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -382,6 +382,11 @@ public:
_code = rhs._code;
if (rhs._err_msg) {
_err_msg = std::make_unique<ErrMsg>(*rhs._err_msg);
+ } else {
+ // If rhs error msg is empty, then should also clear current error
msg
+ // For example, if rhs is OK and current status is error, then
copy to current
+ // status, should clear current error message.
+ _err_msg.reset();
}
return *this;
}
@@ -391,6 +396,8 @@ public:
_code = rhs._code;
if (rhs._err_msg) {
_err_msg = std::move(rhs._err_msg);
+ } else {
+ _err_msg.reset();
}
return *this;
}
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 7227b5704d8..51e01a071bb 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -469,8 +469,10 @@ Status QueryContext::revoke_memory() {
std::sort(tasks.begin(), tasks.end(), [](auto&& l, auto&& r) { return
l.first > r.first; });
- const auto mem_limit = query_mem_tracker->limit();
- const auto target_revoking_size = mem_limit * 0.2;
+ // Do not use memlimit, use current memory usage.
+ // For example, if current limit is 1.6G, but current used is 1G, if
reserve failed
+ // should free 200MB memory, not 300MB
+ const auto target_revoking_size = query_mem_tracker->consumption() * 0.2;
size_t revoked_size = 0;
std::vector<pipeline::PipelineTask*> chosen_tasks;
@@ -492,7 +494,7 @@ Status QueryContext::revoke_memory() {
}
LOG(INFO) << "query: " << print_id(query_context->_query_id)
- << " all revoking tasks done, resumt it.";
+ << " all revoking tasks done, resume it.";
query_context->set_memory_sufficient(true);
});
@@ -501,7 +503,7 @@ Status QueryContext::revoke_memory() {
}
LOG(INFO) << "query: " << print_id(_query_id) << " total revoked size: "
<< revoked_size
- << ", target_size: " << target_revoking_size
+ << ", target_size: " <<
PrettyPrinter::print(target_revoking_size, TUnit::BYTES)
<< ", tasks count: " << chosen_tasks.size() << "/" <<
tasks.size();
return Status::OK();
@@ -524,6 +526,19 @@ std::vector<pipeline::PipelineTask*>
QueryContext::get_revocable_tasks() const {
return tasks;
}
+std::string QueryContext::debug_string() {
+ std::lock_guard l(_paused_mutex);
+ return fmt::format(
+ "MemTracker Label={}, Used={}, Limit={}, Peak={}, running revoke
task count {}, "
+ "MemorySufficient={}, PausedReason={}",
+ query_mem_tracker->label(),
+ PrettyPrinter::print(query_mem_tracker->consumption(),
TUnit::BYTES),
+ PrettyPrinter::print(query_mem_tracker->limit(), TUnit::BYTES),
+ PrettyPrinter::print(query_mem_tracker->peak_consumption(),
TUnit::BYTES),
+ _revoking_tasks_count, _memory_sufficient_dependency->ready(),
+ _paused_reason.to_string());
+}
+
std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
QueryContext::_collect_realtime_query_profile() const {
std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
res;
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index c71b7d4f85a..4ad946562bb 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -356,6 +356,8 @@ public:
_query_source == QuerySource::GROUP_COMMIT_LOAD;
}
+ std::string debug_string();
+
private:
int _timeout_second;
TUniqueId _query_id;
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index e190499ec83..df65124f635 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -559,7 +559,7 @@ void
WorkloadGroupMgr::change_query_to_hard_limit(WorkloadGroupPtr wg, bool enab
}
}
// calculate per query weighted memory limit
- debug_msg = "Query Memory Summary:";
+ debug_msg = "Query Memory Summary: \n";
for (const auto& query : all_query_ctxs) {
auto query_ctx = query.second.lock();
if (!query_ctx) {
@@ -593,19 +593,14 @@ void
WorkloadGroupMgr::change_query_to_hard_limit(WorkloadGroupPtr wg, bool enab
:
wg_high_water_mark_except_load;
}
}
- debug_msg += fmt::format(
- "\n MemTracker Label={}, Used={}, Limit={}, Peak={}",
- query_ctx->get_mem_tracker()->label(),
-
PrettyPrinter::print(query_ctx->get_mem_tracker()->consumption(), TUnit::BYTES),
- PrettyPrinter::print(query_weighted_mem_limit, TUnit::BYTES),
-
PrettyPrinter::print(query_ctx->get_mem_tracker()->peak_consumption(),
- TUnit::BYTES));
+ debug_msg += query_ctx->debug_string() + "\n";
// If the query is a pure load task, then should not modify its limit.
Or it will reserve
// memory failed and we did not hanle it.
if (!query_ctx->is_pure_load_task()) {
query_ctx->set_mem_limit(query_weighted_mem_limit);
}
}
+ //LOG(INFO) << debug_msg;
LOG_EVERY_T(INFO, 60) << debug_msg;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]