This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 7f8a9c4c9c8 support stream load (#41351)
7f8a9c4c9c8 is described below
commit 7f8a9c4c9c80ecb443f46ed7876025767eb4d087
Author: yiguolei <[email protected]>
AuthorDate: Thu Sep 26 16:44:55 2024 +0800
support stream load (#41351)
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
---------
Co-authored-by: yiguolei <[email protected]>
---
be/src/common/config.cpp | 2 +-
be/src/pipeline/pipeline_task.cpp | 6 +-
be/src/runtime/memory/global_memory_arbitrator.cpp | 1 +
be/src/runtime/memory/global_memory_arbitrator.h | 1 +
be/src/runtime/memory/thread_mem_tracker_mgr.h | 31 +-
be/src/runtime/query_context.cpp | 5 +
be/src/runtime/query_context.h | 7 +
be/src/runtime/workload_group/workload_group.cpp | 22 ++
be/src/runtime/workload_group/workload_group.h | 38 +--
.../workload_group/workload_group_manager.cpp | 355 ++++++++++++++-------
.../workload_group/workload_group_manager.h | 12 +-
11 files changed, 317 insertions(+), 163 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 78e68c9de64..7e9ff8d8801 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -96,7 +96,7 @@ DEFINE_String(mem_limit, "90%");
DEFINE_Double(soft_mem_limit_frac, "0.9");
// Cache capacity reduce mem limit as a fraction of soft mem limit.
-DEFINE_mDouble(cache_capacity_reduce_mem_limit_frac, "0.6");
+DEFINE_mDouble(cache_capacity_reduce_mem_limit_frac, "0.7");
// Schema change memory limit as a fraction of soft memory limit.
DEFINE_Double(schema_change_mem_limit_frac, "0.6");
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 4a1e4536373..0028614b22a 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -367,7 +367,7 @@ Status PipelineTask::execute(bool* eos) {
// `_dry_run` means sink operator need no more data
// `_sink->is_finished(_state)` means sink operator should be finished
- size_t reserve_size = 0;
+ int64_t reserve_size = 0;
bool has_enough_memory = true;
if (_dry_run || _sink->is_finished(_state)) {
*eos = true;
@@ -401,7 +401,7 @@ Status PipelineTask::execute(bool* eos) {
if (is_low_wartermark || is_high_wartermark) {
_memory_sufficient_dependency->block();
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
- _state->get_query_ctx()->shared_from_this());
+ _state->get_query_ctx()->shared_from_this(),
reserve_size);
continue;
}
has_enough_memory = false;
@@ -439,7 +439,7 @@ Status PipelineTask::execute(bool* eos) {
<< ", insufficient memory. reserve_size: " <<
reserve_size;
_memory_sufficient_dependency->block();
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
- _state->get_query_ctx()->shared_from_this());
+ _state->get_query_ctx()->shared_from_this(), reserve_size);
break;
}
}
diff --git a/be/src/runtime/memory/global_memory_arbitrator.cpp
b/be/src/runtime/memory/global_memory_arbitrator.cpp
index 0c774187ff3..2b649efba50 100644
--- a/be/src/runtime/memory/global_memory_arbitrator.cpp
+++ b/be/src/runtime/memory/global_memory_arbitrator.cpp
@@ -44,6 +44,7 @@ std::atomic<double>
GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted
std::atomic<double>
GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted {1};
// The value that take affect
std::atomic<double>
GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted {1};
+std::atomic<bool> GlobalMemoryArbitrator::any_workload_group_exceed_limit
{false};
std::mutex GlobalMemoryArbitrator::memtable_memory_refresh_lock;
std::condition_variable GlobalMemoryArbitrator::memtable_memory_refresh_cv;
std::atomic<bool> GlobalMemoryArbitrator::memtable_memory_refresh_notify
{false};
diff --git a/be/src/runtime/memory/global_memory_arbitrator.h
b/be/src/runtime/memory/global_memory_arbitrator.h
index 468d442b662..e2b55c8aa98 100644
--- a/be/src/runtime/memory/global_memory_arbitrator.h
+++ b/be/src/runtime/memory/global_memory_arbitrator.h
@@ -182,6 +182,7 @@ public:
static std::atomic<double> last_wg_trigger_cache_capacity_adjust_weighted;
// The value that take affect
static std::atomic<double> last_affected_cache_capacity_adjust_weighted;
+ static std::atomic<bool> any_workload_group_exceed_limit;
static void notify_cache_adjust_capacity() {
cache_adjust_capacity_notify.store(true, std::memory_order_relaxed);
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 15a57528491..c9f85258d5b 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -286,18 +286,27 @@ inline doris::Status
ThreadMemTrackerMgr::try_reserve(int64_t size) {
// _untracked_mem store bytes that not synchronized to process reserved
memory.
flush_untracked_mem();
auto wg_ptr = _wg_wptr.lock();
- // If wg not exist or wg enable overcommit, then query's memlimit is not
considered.
- if (wg_ptr != nullptr && !wg_ptr->enable_memory_overcommit()) {
- if (!_limiter_tracker->try_reserve(size)) {
- auto err_msg = fmt::format(
- "reserve memory failed, size: {}, because memory tracker
consumption: {}, "
- "limit: "
- "{}",
- size, _limiter_tracker->consumption(),
_limiter_tracker->limit());
- return
doris::Status::Error<ErrorCode::QUERY_MEMORY_EXCEEDED>(err_msg);
+ // For wg with overcommit, the limit will only task affect when memory >
soft limit
+ // wg mgr will change wg's hard limit property.
+ if (wg_ptr != nullptr && wg_ptr->enable_memory_overcommit() &&
+ !wg_ptr->has_changed_to_hard_limit()) {
+ // TODO: Only do a check here, do not real reserve. If we could
reserve it, it is better, but the logic is too complicated.
+ if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) {
+ return doris::Status::Error<ErrorCode::PROCESS_MEMORY_EXCEEDED>(
+ "reserve memory failed, size: {}, because {}", size,
+ GlobalMemoryArbitrator::process_mem_log_str());
+ } else {
+
doris::GlobalMemoryArbitrator::release_process_reserved_memory(size);
+ return Status::OK();
}
- } else {
- _limiter_tracker->reserve(size);
+ }
+ if (!_limiter_tracker->try_reserve(size)) {
+ auto err_msg = fmt::format(
+ "reserve memory failed, size: {}, because memory tracker
consumption: {}, "
+ "limit: "
+ "{}",
+ size, _limiter_tracker->consumption(),
_limiter_tracker->limit());
+ return doris::Status::Error<ErrorCode::QUERY_MEMORY_EXCEEDED>(err_msg);
}
if (wg_ptr) {
if (!wg_ptr->add_wg_refresh_interval_memory_growth(size)) {
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 45fd5562a93..7227b5704d8 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -129,6 +129,11 @@ void QueryContext::_init_query_mem_tracker() {
<< " OR is -1. Using process memory limit instead.";
bytes_limit = MemInfo::mem_limit();
}
+ // If the query is a pure load task(streamload, routine load, group
commit), then it should not use
+ // memlimit per query to limit their memory usage.
+ if (is_pure_load_task()) {
+ bytes_limit = MemInfo::mem_limit();
+ }
if (_query_options.query_type == TQueryType::SELECT) {
query_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::QUERY, fmt::format("Query#Id={}",
print_id(_query_id)),
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index b74b835af63..c71b7d4f85a 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -303,6 +303,7 @@ public:
bool is_low_wartermark = false;
bool is_high_wartermark = false;
_workload_group->check_mem_used(&is_low_wartermark,
&is_high_wartermark);
+ // If the wg is not enable hard limit, this will also take effect
to lower down the memory usage.
if (is_high_wartermark) {
LOG(INFO)
<< "Query " << print_id(_query_id)
@@ -349,6 +350,12 @@ public:
return _paused_reason;
}
+ bool is_pure_load_task() {
+ return _query_source == QuerySource::STREAM_LOAD ||
+ _query_source == QuerySource::ROUTINE_LOAD ||
+ _query_source == QuerySource::GROUP_COMMIT_LOAD;
+ }
+
private:
int _timeout_second;
TUniqueId _query_id;
diff --git a/be/src/runtime/workload_group/workload_group.cpp
b/be/src/runtime/workload_group/workload_group.cpp
index e0438a9d729..bca2f8401de 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -114,6 +114,28 @@ std::string WorkloadGroup::debug_string() const {
_remote_scan_bytes_per_second);
}
+bool WorkloadGroup::add_wg_refresh_interval_memory_growth(int64_t size) {
+ // If a group is enable memory overcommit, then not need check the limit
+ // It is always true, and it will only fail when process memory is not
+ // enough.
+ if (_enable_memory_overcommit) {
+ if (doris::GlobalMemoryArbitrator::is_exceed_soft_mem_limit(size)) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+ auto realtime_total_mem_used =
+ _total_mem_used + _wg_refresh_interval_memory_growth.load() + size;
+ if ((realtime_total_mem_used >
+ ((double)_memory_limit *
_spill_high_watermark.load(std::memory_order_relaxed) / 100))) {
+ return false;
+ } else {
+ _wg_refresh_interval_memory_growth.fetch_add(size);
+ return true;
+ }
+}
+
std::string WorkloadGroup::memory_debug_string() const {
auto realtime_total_mem_used = _total_mem_used +
_wg_refresh_interval_memory_growth.load();
auto mem_used_ratio = realtime_total_mem_used /
((double)_weighted_memory_limit + 1);
diff --git a/be/src/runtime/workload_group/workload_group.h
b/be/src/runtime/workload_group/workload_group.h
index 1d640afee1d..094258caee5 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -76,9 +76,9 @@ public:
int64_t memory_limit() const {
std::shared_lock<std::shared_mutex> r_lock(_mutex);
return _memory_limit;
- };
+ }
- int64_t weighted_memory_limit() const { return _weighted_memory_limit; };
+ int64_t total_mem_used() const { return _total_mem_used; }
void set_weighted_memory_limit(int64_t weighted_memory_limit) {
_weighted_memory_limit = weighted_memory_limit;
@@ -103,24 +103,7 @@ public:
return _total_query_slot_count.load(std::memory_order_relaxed);
}
- bool add_wg_refresh_interval_memory_growth(int64_t size) {
- // If a group is enable memory overcommit, then not need check the
limit
- // It is always true, and it will only fail when process memory is not
- // enough.
- if (_enable_memory_overcommit) {
- return true;
- }
- auto realtime_total_mem_used =
- _total_mem_used + _wg_refresh_interval_memory_growth.load() +
size;
- if ((realtime_total_mem_used >
- ((double)_weighted_memory_limit *
- _spill_high_watermark.load(std::memory_order_relaxed) / 100))) {
- return false;
- } else {
- _wg_refresh_interval_memory_growth.fetch_add(size);
- return true;
- }
- }
+ bool add_wg_refresh_interval_memory_growth(int64_t size);
void sub_wg_refresh_interval_memory_growth(int64_t size) {
_wg_refresh_interval_memory_growth.fetch_sub(size);
@@ -129,10 +112,10 @@ public:
void check_mem_used(bool* is_low_wartermark, bool* is_high_wartermark)
const {
auto realtime_total_mem_used = _total_mem_used +
_wg_refresh_interval_memory_growth.load();
*is_low_wartermark = (realtime_total_mem_used >
- ((double)_weighted_memory_limit *
+ ((double)_memory_limit *
_spill_low_watermark.load(std::memory_order_relaxed) / 100));
*is_high_wartermark = (realtime_total_mem_used >
- ((double)_weighted_memory_limit *
+ ((double)_memory_limit *
_spill_high_watermark.load(std::memory_order_relaxed) / 100));
}
@@ -166,6 +149,11 @@ public:
return _memory_limit > 0;
}
+ bool exceed_limit() {
+ std::shared_lock<std::shared_mutex> r_lock(_mutex);
+ return _memory_limit > 0 ? _total_mem_used > _memory_limit : false;
+ }
+
Status add_query(TUniqueId query_id, std::shared_ptr<QueryContext>
query_ctx) {
std::unique_lock<std::shared_mutex> wlock(_mutex);
if (_is_shutdown) {
@@ -236,9 +224,9 @@ public:
int64_t load_buffer_limit() { return _load_buffer_limit; }
- void update_memory_sufficent(bool is_sufficient) { _is_sufficient =
is_sufficient; }
+ bool has_changed_to_hard_limit() const { return _has_changed_hard_limit; }
- bool memory_sufficent() const { return _is_sufficient; }
+ void change_to_hard_limit(bool to_hard_limit) { _has_changed_hard_limit =
to_hard_limit; }
private:
mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share,
_memory_limit
@@ -250,7 +238,7 @@ private:
// If the wg's memory reached high water mark, then the load buffer
// will be restricted to this limit.
int64_t _load_buffer_limit;
- std::atomic<bool> _is_sufficient = true;
+ std::atomic<bool> _has_changed_hard_limit = false;
// memory used by load memtable
int64_t _active_mem_usage = 0;
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 92235c1ded7..e190499ec83 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -38,9 +38,12 @@
namespace doris {
-PausedQuery::PausedQuery(std::shared_ptr<QueryContext> query_ctx, double
cache_ratio)
+PausedQuery::PausedQuery(std::shared_ptr<QueryContext> query_ctx, double
cache_ratio,
+ bool any_wg_exceed_limit, int64_t reserve_size)
: query_ctx_(query_ctx),
cache_ratio_(cache_ratio),
+ any_wg_exceed_limit_(any_wg_exceed_limit),
+ reserve_size_(reserve_size),
query_id_(print_id(query_ctx->query_id())) {
enqueue_at = std::chrono::system_clock::now();
}
@@ -159,11 +162,16 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit()
{
// and calculate total memory used of all queries.
int64_t all_workload_groups_mem_usage = 0;
std::unordered_map<uint64_t, WorkloadGroupMemInfo> wgs_mem_info;
+ bool has_wg_exceed_limit = false;
for (auto& [wg_id, wg] : _workload_groups) {
wgs_mem_info[wg_id].total_mem_used =
wg->make_memory_tracker_snapshots(&wgs_mem_info[wg_id].tracker_snapshots);
all_workload_groups_mem_usage += wgs_mem_info[wg_id].total_mem_used;
+ if (wg->exceed_limit()) {
+ has_wg_exceed_limit = true;
+ }
}
+ doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit =
has_wg_exceed_limit;
if (all_workload_groups_mem_usage <= 0) {
return;
}
@@ -204,105 +212,7 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit()
{
weighted_memory_limit_ratio);
LOG_EVERY_T(INFO, 60) << debug_msg;
for (auto& wg : _workload_groups) {
- auto wg_mem_limit = wg.second->memory_limit();
- auto wg_weighted_mem_limit = int64_t(wg_mem_limit *
weighted_memory_limit_ratio);
- wg.second->set_weighted_memory_limit(wg_weighted_mem_limit);
- auto all_query_ctxs = wg.second->queries();
- bool is_low_wartermark = false;
- bool is_high_wartermark = false;
- wg.second->check_mem_used(&is_low_wartermark, &is_high_wartermark);
- int64_t wg_high_water_mark_limit =
- (int64_t)(wg_mem_limit *
wg.second->spill_threshold_high_water_mark() * 1.0 / 100);
- int64_t weighted_high_water_mark_limit =
- (int64_t)(wg_weighted_mem_limit *
wg.second->spill_threshold_high_water_mark() *
- 1.0 / 100);
- std::string debug_msg;
- if (is_high_wartermark || is_low_wartermark) {
- debug_msg = fmt::format(
- "\nWorkload Group {}: mem limit: {}, mem used: {},
weighted mem limit: {}, "
- "high water mark mem limit: {}, used ratio: {}",
- wg.second->name(),
- PrettyPrinter::print(wg.second->memory_limit(),
TUnit::BYTES),
-
PrettyPrinter::print(wgs_mem_info[wg.first].total_mem_used, TUnit::BYTES),
- PrettyPrinter::print(wg_weighted_mem_limit, TUnit::BYTES),
- PrettyPrinter::print(weighted_high_water_mark_limit,
TUnit::BYTES),
- (double)wgs_mem_info[wg.first].total_mem_used /
wg_weighted_mem_limit);
-
- debug_msg += "\n Query Memory Summary:";
- // check whether queries need to revoke memory for task group
- for (const auto& query_mem_tracker :
wgs_mem_info[wg.first].tracker_snapshots) {
- debug_msg += fmt::format(
- "\n MemTracker Label={}, Used={}, MemLimit={}, "
- "Peak={}",
- query_mem_tracker->label(),
- PrettyPrinter::print(query_mem_tracker->consumption(),
TUnit::BYTES),
- PrettyPrinter::print(query_mem_tracker->limit(),
TUnit::BYTES),
-
PrettyPrinter::print(query_mem_tracker->peak_consumption(), TUnit::BYTES));
- }
- }
-
- // If the wg enable over commit memory, then it is no need to update
query memlimit
- if (wg.second->enable_memory_overcommit()) {
- continue;
- }
- int32_t total_used_slot_count = 0;
- int32_t total_slot_count = wg.second->total_query_slot_count();
- // calculate total used slot count
- for (const auto& query : all_query_ctxs) {
- auto query_ctx = query.second.lock();
- if (!query_ctx) {
- continue;
- }
- total_used_slot_count += query_ctx->get_slot_count();
- }
- // calculate per query weighted memory limit
- debug_msg = "Query Memory Summary:";
- for (const auto& query : all_query_ctxs) {
- auto query_ctx = query.second.lock();
- if (!query_ctx) {
- continue;
- }
- int64_t query_weighted_mem_limit = 0;
- // If the query enable hard limit, then it should not use the soft
limit
- if (query_ctx->enable_query_slot_hard_limit()) {
- if (total_slot_count < 1) {
- LOG(WARNING)
- << "query " << print_id(query_ctx->query_id())
- << " enabled hard limit, but the slot count < 1,
could not take affect";
- } else {
- // If the query enable hard limit, then not use weighted
info any more, just use the settings limit.
- query_weighted_mem_limit =
(int64_t)((wg_high_water_mark_limit *
-
query_ctx->get_slot_count() * 1.0) /
- total_slot_count);
- }
- } else {
- // If low water mark is not reached, then use process memory
limit as query memory limit.
- // It means it will not take effect.
- // If there are some query in paused list, then limit should
take effect.
- if (!is_low_wartermark && wg.second->memory_sufficent()) {
- query_weighted_mem_limit = wg_high_water_mark_limit;
- } else {
- query_weighted_mem_limit =
- total_used_slot_count > 0
- ? (int64_t)((wg_high_water_mark_limit +
total_used_slot_count) *
- query_ctx->get_slot_count() *
1.0 /
- total_used_slot_count)
- : wg_high_water_mark_limit;
- }
- }
- debug_msg += fmt::format(
- "\n MemTracker Label={}, Used={}, Limit={}, Peak={}",
- query_ctx->get_mem_tracker()->label(),
-
PrettyPrinter::print(query_ctx->get_mem_tracker()->consumption(), TUnit::BYTES),
- PrettyPrinter::print(query_weighted_mem_limit,
TUnit::BYTES),
-
PrettyPrinter::print(query_ctx->get_mem_tracker()->peak_consumption(),
- TUnit::BYTES));
-
- query_ctx->set_mem_limit(query_weighted_mem_limit);
- }
- // During memory insufficent stage, we already set every query's
memlimit, so that the flag is useless any more.
- wg.second->update_memory_sufficent(true);
- LOG_EVERY_T(INFO, 60) << debug_msg;
+ change_query_to_hard_limit(wg.second, false);
}
}
@@ -330,26 +240,44 @@ void
WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) {
}
}
-void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>&
query_ctx) {
+void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>&
query_ctx,
+ int64_t reserve_size) {
std::lock_guard<std::mutex> lock(_paused_queries_lock);
DCHECK(query_ctx != nullptr);
auto wg = query_ctx->workload_group();
auto&& [it, inserted] = _paused_queries_list[wg].emplace(
- query_ctx,
doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted);
+ query_ctx,
doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted,
+ doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit,
reserve_size);
+ // Check if this is an invalid reserve, for example, if the reserve size
is too large, larger than the query limit
+ // if hard limit is enabled, then not need enable other queries hard limit.
if (inserted) {
LOG(INFO) << "workload group " << wg->debug_string()
<< " insert one new paused query: " << it->query_id();
}
}
+/**
+ * 1. When Process's memory is lower than soft limit, then all workload group
will be converted to hard limit (Exception: there is only one workload group).
+ * 2. Reserve logic for workload group that is soft limit take no effect, it
will always return success.
+ * 3. QueryLimit for streamload,routineload,group commit, take no affect, it
will always return success, but workload group's hard limit will take affect.
+ * 4. See handle_non_overcommit_wg_paused_queries for hard limit logic.
+ */
+void WorkloadGroupMgr::handle_paused_queries() {
+ handle_non_overcommit_wg_paused_queries();
+ handle_overcommit_wg_paused_queries();
+}
+
/**
* Strategy 1: A revocable query should not have any running
task(PipelineTask).
* strategy 2: If the workload group has any task exceed workload group
memlimit, then set all queryctx's memlimit
* strategy 3: If any query exceed process memlimit, then should clear all
caches.
* strategy 4: If any query exceed query's memlimit, then do spill disk or
cancel it.
- * strategy 5: If any query exceed process's memlimit and cache is zero, then
do spill disk or cancel it.
+ * strategy 5: If any query exceed process's memlimit and cache is zero, then
do following:
+ * 1. cancel other wg's(soft limit) query that exceed limit
+ * 2. spill disk
+ * 3. cancel it self.
*/
-void WorkloadGroupMgr::handle_paused_queries() {
+void WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
const int64_t TIMEOUT_IN_QUEUE = 1000L * 10;
std::unique_lock<std::mutex> lock(_paused_queries_lock);
for (auto it = _paused_queries_list.begin(); it !=
_paused_queries_list.end();) {
@@ -358,16 +286,14 @@ void WorkloadGroupMgr::handle_paused_queries() {
if (queries_list.empty()) {
LOG(INFO) << "wg: " << wg->debug_string()
<< " has no paused query, update it to memory sufficent";
- wg->update_memory_sufficent(true);
it = _paused_queries_list.erase(it);
continue;
}
-
bool is_low_wartermark = false;
bool is_high_wartermark = false;
wg->check_mem_used(&is_low_wartermark, &is_high_wartermark);
-
+ bool has_changed_hard_limit = false;
// If the query is paused because its limit exceed the query itself's
memlimit, then just spill disk.
// The query's memlimit is set using slot mechanism and its value is
set using the user settings, not
// by weighted value. So if reserve failed, then it is actually exceed
limit.
@@ -379,17 +305,30 @@ void WorkloadGroupMgr::handle_paused_queries() {
continue;
}
if (query_ctx->is_cancelled()) {
- /// Memory may not be released immediately after a query is
canceled.
- /// So here wait for a while.
- if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE) {
- LOG(INFO) << "query: " << print_id(query_ctx->query_id())
- << " was canceled, remove from paused list";
- query_it = queries_list.erase(query_it);
- }
+ LOG(INFO) << "query: " << print_id(query_ctx->query_id())
+ << " was canceled, remove from paused list";
+ query_it = queries_list.erase(query_it);
continue;
}
+
+ // Only deal with non overcommit workload group.
+ if (wg->enable_memory_overcommit() &&
!wg->has_changed_to_hard_limit() &&
+
!query_ctx->paused_reason().is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) {
+ // Soft limit wg will only reserve failed when process limit
exceed. But in some corner case,
+ // when reserve, the wg is hard limit, the query reserve
failed, but when this loop run
+ // the wg is converted to soft limit.
+ // So that should resume the query.
+ LOG(WARNING) << "query: " << print_id(query_ctx->query_id())
+ << " reserve memory failed, but workload group
not converted to hard "
+ "limit, it should not happen, resume it again.
paused reason: "
+ << query_ctx->paused_reason();
+ query_ctx->set_memory_sufficient(true);
+ query_it = queries_list.erase(query_it);
+ continue;
+ }
+
if
(query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
- bool spill_res = spill_or_cancel_query(query_ctx,
query_ctx->paused_reason());
+ bool spill_res = handle_single_query(query_ctx,
query_ctx->paused_reason());
if (!spill_res) {
++query_it;
continue;
@@ -398,14 +337,51 @@ void WorkloadGroupMgr::handle_paused_queries() {
continue;
}
} else if
(query_ctx->paused_reason().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) {
- if (wg->memory_sufficent()) {
- wg->update_memory_sufficent(false);
+ if (!has_changed_hard_limit) {
+ change_query_to_hard_limit(wg, true);
+ has_changed_hard_limit = true;
LOG(INFO) << "query: " << print_id(query_ctx->query_id())
<< " reserve memory failed due to workload group
memory exceed, "
"should set the workload group work in memory
insufficent mode, "
"so that other query will reduce their
memory. wg: "
<< wg->debug_string();
}
+ // If there are a lot of memtable memory, then wait them flush
finished.
+ MemTableMemoryLimiter* memtable_limiter =
+
doris::ExecEnv::GetInstance()->memtable_memory_limiter();
+ // Not use memlimit, should use high water mark.
+ int64_t memtable_active_bytes = 0;
+ int64_t memtable_queue_bytes = 0;
+ int64_t memtable_flush_bytes = 0;
+ wg->get_load_mem_usage(&memtable_active_bytes,
&memtable_queue_bytes,
+ &memtable_flush_bytes);
+ // TODO: should add a signal in memtable limiter to prevent
new batch
+ // For example, streamload, it will not reserve many memory,
but it will occupy many memtable memory.
+ // TODO: 0.2 should be a workload group properties. For
example, the group is optimized for load,then the value
+ // should be larged, if the group is optimized for query, then
the value should be smaller.
+ int64_t max_wg_memtable_bytes = wg->load_buffer_limit();
+ if (memtable_active_bytes + memtable_queue_bytes +
memtable_flush_bytes >
+ max_wg_memtable_bytes) {
+ // There are many table in flush queue, just waiting them
flush finished.
+ if (memtable_active_bytes <
(int64_t)(max_wg_memtable_bytes * 0.6)) {
+ LOG_EVERY_T(INFO, 60)
+ << wg->name() << " load memtable size is: " <<
memtable_active_bytes
+ << ", " << memtable_queue_bytes << ", " <<
memtable_flush_bytes
+ << ", load buffer limit is: " <<
max_wg_memtable_bytes
+ << " wait for flush finished to release more
memory";
+ continue;
+ } else {
+ // Flush some memtables(currently written) to flush
queue.
+ memtable_limiter->flush_workload_group_memtables(
+ wg->id(),
+ memtable_active_bytes -
(int64_t)(max_wg_memtable_bytes * 0.6));
+ LOG_EVERY_T(INFO, 60)
+ << wg->name() << " load memtable size is: " <<
memtable_active_bytes
+ << ", " << memtable_queue_bytes << ", " <<
memtable_flush_bytes
+ << ", flush some active memtable to revoke
memory";
+ continue;
+ }
+ }
// Should not put the query back to task scheduler
immediately, because when wg's memory not sufficient,
// and then set wg's flag, other query may not free memory
very quickly.
if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE) {
@@ -418,6 +394,8 @@ void WorkloadGroupMgr::handle_paused_queries() {
}
continue;
} else {
+ // PROCESS Reserve logic using hard limit, if reached here,
should try to spill or cancel.
+ // GC Logic also work at hard limit, so GC may cancel some
query and could not spill here.
// If wg's memlimit not exceed, but process memory exceed, it
means cache or other metadata
// used too much memory. Should clean all cache here.
// 1. Check cache used, if cache is larger than > 0, then just
return and wait for it to 0 to release some memory.
@@ -433,7 +411,8 @@ void WorkloadGroupMgr::handle_paused_queries() {
"to 0 now";
}
if (query_it->cache_ratio_ < 0.001) {
- bool spill_res = spill_or_cancel_query(query_ctx,
query_ctx->paused_reason());
+ // TODO: Find other exceed limit workload group and cancel
query.
+ bool spill_res = handle_single_query(query_ctx,
query_ctx->paused_reason());
if (!spill_res) {
++query_it;
continue;
@@ -460,8 +439,41 @@ void WorkloadGroupMgr::handle_paused_queries() {
}
}
-bool WorkloadGroupMgr::spill_or_cancel_query(std::shared_ptr<QueryContext>
query_ctx,
- Status paused_reason) {
+void WorkloadGroupMgr::handle_overcommit_wg_paused_queries() {
+ std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+ // If there is only one workload group and it is overcommit, then do
nothing.
+ // And should also start MinorGC logic.
+ if (_workload_groups.size() == 1) {
+ return;
+ }
+ if (doris::GlobalMemoryArbitrator::is_exceed_soft_mem_limit(100 * 1024 *
1024)) {
+ for (auto& [wg_id, wg] : _workload_groups) {
+ if (wg->enable_memory_overcommit() &&
!wg->has_changed_to_hard_limit()) {
+ wg->change_to_hard_limit(true);
+ LOG(INFO) << "Process memory usage will exceed soft limit,
change all workload "
+ "group with overcommit to hard limit now. "
+ << wg->debug_string();
+ }
+ }
+ }
+ // If current memory usage is below soft memlimit - 10%, then enable wg's
overcommit
+ if (!doris::GlobalMemoryArbitrator::is_exceed_soft_mem_limit(
+ (int64_t)(MemInfo::mem_limit() * 0.1))) {
+ for (auto& [wg_id, wg] : _workload_groups) {
+ if (wg->enable_memory_overcommit() &&
wg->has_changed_to_hard_limit()) {
+ wg->change_to_hard_limit(false);
+ LOG(INFO) << "Process memory usage is lower than soft limit,
enable all workload "
+ "group overcommit now. "
+ << wg->debug_string();
+ }
+ }
+ }
+}
+// If the query could release some memory, for example, spill disk, flush
memtable then the return value is true.
+// If the query could not release memory, then cancel the query, the return
value is true.
+// If the query is not ready to do these tasks, it means just wait.
+bool WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext>
query_ctx,
+ Status paused_reason) {
// TODO: If the query is an insert into select query, should consider
memtable as revoke memory.
size_t revocable_size = 0;
size_t memory_usage = 0;
@@ -494,6 +506,109 @@ bool
WorkloadGroupMgr::spill_or_cancel_query(std::shared_ptr<QueryContext> query
return true;
}
+void WorkloadGroupMgr::change_query_to_hard_limit(WorkloadGroupPtr wg, bool
enable_hard_limit) {
+ auto wg_mem_limit = wg->memory_limit();
+ auto wg_weighted_mem_limit = int64_t(wg_mem_limit * 1);
+ wg->set_weighted_memory_limit(wg_weighted_mem_limit);
+ auto all_query_ctxs = wg->queries();
+ bool is_low_wartermark = false;
+ bool is_high_wartermark = false;
+ wg->check_mem_used(&is_low_wartermark, &is_high_wartermark);
+ int64_t wg_high_water_mark_limit =
+ (int64_t)(wg_mem_limit * wg->spill_threshold_high_water_mark() *
1.0 / 100);
+ int64_t memtable_active_bytes = 0;
+ int64_t memtable_queue_bytes = 0;
+ int64_t memtable_flush_bytes = 0;
+ wg->get_load_mem_usage(&memtable_active_bytes, &memtable_queue_bytes,
&memtable_flush_bytes);
+ int64_t memtable_usage = memtable_active_bytes + memtable_queue_bytes +
memtable_flush_bytes;
+ int64_t wg_high_water_mark_except_load = wg_high_water_mark_limit;
+ if (memtable_usage > wg->load_buffer_limit()) {
+ wg_high_water_mark_except_load = wg_high_water_mark_limit -
wg->load_buffer_limit();
+ } else {
+ wg_high_water_mark_except_load =
+ wg_high_water_mark_limit - memtable_usage - 10 * 1024 * 1024;
+ }
+ std::string debug_msg;
+ if (is_high_wartermark || is_low_wartermark) {
+ debug_msg = fmt::format(
+ "\nWorkload Group {}: mem limit: {}, mem used: {}, weighted
mem limit: {}, "
+ "high water mark mem limit: {}, load memtable usage: {}, used
ratio: {}",
+ wg->name(), PrettyPrinter::print(wg->memory_limit(),
TUnit::BYTES),
+ PrettyPrinter::print(wg->total_mem_used(), TUnit::BYTES),
+ PrettyPrinter::print(wg_weighted_mem_limit, TUnit::BYTES),
+ PrettyPrinter::print(wg_high_water_mark_limit, TUnit::BYTES),
+ PrettyPrinter::print(memtable_usage, TUnit::BYTES),
+ (double)(wg->total_mem_used()) / wg_weighted_mem_limit);
+ }
+
+ // If the wg enable over commit memory, then it is no need to update query
memlimit
+ if (wg->enable_memory_overcommit() && !wg->has_changed_to_hard_limit()) {
+ return;
+ }
+ int32_t total_used_slot_count = 0;
+ int32_t total_slot_count = wg->total_query_slot_count();
+ // calculate total used slot count
+ for (const auto& query : all_query_ctxs) {
+ auto query_ctx = query.second.lock();
+ if (!query_ctx) {
+ continue;
+ }
+ // Streamload kafka load group commit, not modify slot
+ if (!query_ctx->is_pure_load_task()) {
+ total_used_slot_count += query_ctx->get_slot_count();
+ }
+ }
+ // calculate per query weighted memory limit
+ debug_msg = "Query Memory Summary:";
+ for (const auto& query : all_query_ctxs) {
+ auto query_ctx = query.second.lock();
+ if (!query_ctx) {
+ continue;
+ }
+ int64_t query_weighted_mem_limit = 0;
+ // If the query enable hard limit, then it should not use the soft
limit
+ if (query_ctx->enable_query_slot_hard_limit()) {
+ if (total_slot_count < 1) {
+ LOG(WARNING)
+ << "query " << print_id(query_ctx->query_id())
+ << " enabled hard limit, but the slot count < 1, could
not take affect";
+ } else {
+ // If the query enable hard limit, then not use weighted info
any more, just use the settings limit.
+ query_weighted_mem_limit =
(int64_t)((wg_high_water_mark_except_load *
+
query_ctx->get_slot_count() * 1.0) /
+ total_slot_count);
+ }
+ } else {
+ // If low water mark is not reached, then use process memory limit
as query memory limit.
+ // It means it will not take effect.
+ // If there are some query in paused list, then limit should take
effect.
+ if (!is_low_wartermark && !enable_hard_limit) {
+ query_weighted_mem_limit = wg_high_water_mark_except_load;
+ } else {
+ query_weighted_mem_limit = total_used_slot_count > 0
+ ?
(int64_t)((wg_high_water_mark_except_load +
+
total_used_slot_count) *
+
query_ctx->get_slot_count() * 1.0 /
+
total_used_slot_count)
+ :
wg_high_water_mark_except_load;
+ }
+ }
+ debug_msg += fmt::format(
+ "\n MemTracker Label={}, Used={}, Limit={}, Peak={}",
+ query_ctx->get_mem_tracker()->label(),
+
PrettyPrinter::print(query_ctx->get_mem_tracker()->consumption(), TUnit::BYTES),
+ PrettyPrinter::print(query_weighted_mem_limit, TUnit::BYTES),
+
PrettyPrinter::print(query_ctx->get_mem_tracker()->peak_consumption(),
+ TUnit::BYTES));
+ // If the query is a pure load task, then should not modify its limit.
Or it will reserve
+ // memory failed and we did not hanle it.
+ if (!query_ctx->is_pure_load_task()) {
+ query_ctx->set_mem_limit(query_weighted_mem_limit);
+ }
+ }
+ LOG_EVERY_T(INFO, 60) << debug_msg;
+}
+
void WorkloadGroupMgr::stop() {
for (auto iter = _workload_groups.begin(); iter != _workload_groups.end();
iter++) {
iter->second->try_stop_schedulers();
diff --git a/be/src/runtime/workload_group/workload_group_manager.h
b/be/src/runtime/workload_group/workload_group_manager.h
index f84bf3a29ff..8f69d5653b4 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -45,8 +45,11 @@ public:
std::chrono::system_clock::time_point enqueue_at;
size_t last_mem_usage {0};
double cache_ratio_ {0.0};
+ bool any_wg_exceed_limit_ {false};
+ int64_t reserve_size_ {0};
- PausedQuery(std::shared_ptr<QueryContext> query_ctx, double cache_ratio);
+ PausedQuery(std::shared_ptr<QueryContext> query_ctx, double cache_ratio,
+ bool any_wg_exceed_limit, int64_t reserve_size);
int64_t elapsed_time() const {
auto now = std::chrono::system_clock::now();
@@ -96,14 +99,17 @@ public:
void get_wg_resource_usage(vectorized::Block* block);
- void add_paused_query(const std::shared_ptr<QueryContext>& query_ctx);
+ void add_paused_query(const std::shared_ptr<QueryContext>& query_ctx,
int64_t reserve_size);
void handle_paused_queries();
void update_load_memtable_usage(const std::map<uint64_t, MemtableUsage>&
wg_memtable_usages);
private:
- bool spill_or_cancel_query(std::shared_ptr<QueryContext> query_ctx, Status
paused_reason);
+ bool handle_single_query(std::shared_ptr<QueryContext> query_ctx, Status
paused_reason);
+ void handle_non_overcommit_wg_paused_queries();
+ void handle_overcommit_wg_paused_queries();
+ void change_query_to_hard_limit(WorkloadGroupPtr wg, bool
enable_hard_limit);
private:
std::shared_mutex _group_mutex;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]