This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 059be6317dd add load buffer limit for load buffer management (#41115)
059be6317dd is described below
commit 059be6317dddefe338f4ce8cf2b05e31292c8175
Author: yiguolei <[email protected]>
AuthorDate: Mon Sep 23 12:54:21 2024 +0800
add load buffer limit for load buffer management (#41115)
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
---------
Co-authored-by: yiguolei <[email protected]>
---
be/src/agent/workload_group_listener.cpp | 2 ++
be/src/runtime/workload_group/workload_group.cpp | 29 ++++++++++++++------
be/src/runtime/workload_group/workload_group.h | 6 ++++
.../workload_group/workload_group_manager.cpp | 32 +++++++++++-----------
be/src/util/mem_info.cpp | 5 ++--
5 files changed, 48 insertions(+), 26 deletions(-)
diff --git a/be/src/agent/workload_group_listener.cpp
b/be/src/agent/workload_group_listener.cpp
index f0f57869f25..05e72c4038a 100644
--- a/be/src/agent/workload_group_listener.cpp
+++ b/be/src/agent/workload_group_listener.cpp
@@ -32,6 +32,8 @@ void WorkloadGroupListener::handle_topic_info(const
std::vector<TopicInfo>& topi
if (!topic_info.__isset.workload_group_info) {
continue;
}
+ LOG(INFO) << "Received publish workload group info request: "
+ << apache::thrift::ThriftDebugString(topic_info).c_str();
is_set_workload_group_info = true;
// 1 parse topic info to group info
diff --git a/be/src/runtime/workload_group/workload_group.cpp
b/be/src/runtime/workload_group/workload_group.cpp
index dac52e1def4..b4afe462696 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -67,7 +67,8 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info)
_spill_low_watermark(tg_info.spill_low_watermark),
_spill_high_watermark(tg_info.spill_high_watermark),
_scan_bytes_per_second(tg_info.read_bytes_per_second),
- _remote_scan_bytes_per_second(tg_info.remote_read_bytes_per_second) {
+ _remote_scan_bytes_per_second(tg_info.remote_read_bytes_per_second),
+ _total_query_slot_count(tg_info.total_query_slot_count) {
std::vector<DataDirInfo>& data_dir_list =
io::BeConfDataDirReader::be_config_data_dir_list;
for (const auto& data_dir : data_dir_list) {
_scan_io_throttle_map[data_dir.path] =
@@ -82,21 +83,33 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo&
tg_info)
std::make_unique<bvar::Adder<size_t>>(_name,
"total_local_read_bytes");
_total_local_scan_io_per_second =
std::make_unique<bvar::PerSecond<bvar::Adder<size_t>>>(
_name, "total_local_read_bytes_per_second",
_total_local_scan_io_adder.get(), 1);
+ _load_buffer_limit = (int64_t)(_memory_limit * 0.2);
}
std::string WorkloadGroup::debug_string() const {
std::shared_lock<std::shared_mutex> rl {_mutex};
+ auto realtime_total_mem_used = _total_mem_used +
_wg_refresh_interval_memory_growth.load();
+ auto mem_used_ratio = realtime_total_mem_used /
((double)_weighted_memory_limit + 1);
return fmt::format(
- "TG[id = {}, name = {}, cpu_share = {}, memory_limit = {},
enable_memory_overcommit = "
- "{}, version = {}, cpu_hard_limit = {}, scan_thread_num = "
+ "WorkloadGroup[id = {}, name = {}, version = {}, cpu_share = {}, "
+ "total_query_slot_count={}, "
+ "memory_limit = {}, "
+ "enable_memory_overcommit = {}, weighted_memory_limit = {},
total_mem_used = {},"
+ "wg_refresh_interval_memory_growth = {}, mem_used_ratio = {},
spill_low_watermark = "
+ "{}, spill_high_watermark = {},cpu_hard_limit = {},
scan_thread_num = "
"{}, max_remote_scan_thread_num = {}, min_remote_scan_thread_num =
{}, "
- "spill_low_watermark={}, spill_high_watermark={}, is_shutdown={},
query_num={}, "
+ "is_shutdown={}, query_num={}, "
"read_bytes_per_second={}, remote_read_bytes_per_second={}]",
- _id, _name, cpu_share(), PrettyPrinter::print(_memory_limit,
TUnit::BYTES),
- _enable_memory_overcommit ? "true" : "false", _version,
cpu_hard_limit(),
+ _id, _name, _version, cpu_share(), _total_query_slot_count,
+ PrettyPrinter::print(_memory_limit, TUnit::BYTES),
+ _enable_memory_overcommit ? "true" : "false",
+ PrettyPrinter::print(_weighted_memory_limit.load(), TUnit::BYTES),
+ PrettyPrinter::print(_total_mem_used.load(), TUnit::BYTES),
+ PrettyPrinter::print(_wg_refresh_interval_memory_growth.load(),
TUnit::BYTES),
+ mem_used_ratio, _spill_low_watermark, _spill_high_watermark,
cpu_hard_limit(),
_scan_thread_num, _max_remote_scan_thread_num,
_min_remote_scan_thread_num,
- _spill_low_watermark, _spill_high_watermark, _is_shutdown,
_query_ctxs.size(),
- _scan_bytes_per_second, _remote_scan_bytes_per_second);
+ _is_shutdown, _query_ctxs.size(), _scan_bytes_per_second,
+ _remote_scan_bytes_per_second);
}
std::string WorkloadGroup::memory_debug_string() const {
diff --git a/be/src/runtime/workload_group/workload_group.h
b/be/src/runtime/workload_group/workload_group.h
index abbb585692b..b00bbfe68b5 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -233,12 +233,18 @@ public:
}
int64_t get_remote_scan_bytes_per_second();
+ int64_t load_buffer_limit() { return _load_buffer_limit; }
+
private:
mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share,
_memory_limit
const uint64_t _id;
std::string _name;
int64_t _version;
int64_t _memory_limit; // bytes
+ // For example, load memtable, write to parquet.
+ // If the wg's memory reached high water mark, then the load buffer
+ // will be restricted to this limit.
+ int64_t _load_buffer_limit;
// memory used by load memtable
int64_t _active_mem_usage = 0;
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 9baf193d721..39e20db4fca 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -185,6 +185,8 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
if (all_workload_groups_mem_usage < process_memory_usage) {
int64_t public_memory = process_memory_usage -
all_workload_groups_mem_usage;
weighted_memory_limit_ratio = 1 - (double)public_memory /
(double)process_memory_limit;
+ // Round the value from 1% to 100%.
+ weighted_memory_limit_ratio = std::floor(weighted_memory_limit_ratio *
100) / 100;
}
std::string debug_msg = fmt::format(
@@ -204,9 +206,10 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
bool is_high_wartermark = false;
wg.second->check_mem_used(&is_low_wartermark, &is_high_wartermark);
int64_t wg_high_water_mark_limit =
- wg_mem_limit * wg.second->spill_threshold_high_water_mark() /
100;
+ (int64_t)(wg_mem_limit *
wg.second->spill_threshold_high_water_mark() * 1.0 / 100);
int64_t weighted_high_water_mark_limit =
- wg_weighted_mem_limit *
wg.second->spill_threshold_high_water_mark() / 100;
+ (int64_t)(wg_weighted_mem_limit *
wg.second->spill_threshold_high_water_mark() *
+ 1.0 / 100);
std::string debug_msg;
if (is_high_wartermark || is_low_wartermark) {
debug_msg = fmt::format(
@@ -230,7 +233,6 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
PrettyPrinter::print(query_mem_tracker->limit(),
TUnit::BYTES),
PrettyPrinter::print(query_mem_tracker->peak_consumption(), TUnit::BYTES));
}
- continue;
}
// If the wg enable over commit memory, then it is no need to update
query memlimit
@@ -263,20 +265,21 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit()
{
<< " enabled hard limit, but the slot count < 1,
could not take affect";
} else {
// If the query enable hard limit, then not use weighted
info any more, just use the settings limit.
- query_weighted_mem_limit =
- (wg_high_water_mark_limit *
query_ctx->get_slot_count()) /
- total_slot_count;
+ query_weighted_mem_limit =
(int64_t)((wg_high_water_mark_limit *
+
query_ctx->get_slot_count() * 1.0) /
+ total_slot_count);
}
} else {
// If low water mark is not reached, then use process memory
limit as query memory limit.
// It means it will not take effect.
if (!is_low_wartermark) {
- query_weighted_mem_limit = process_memory_limit;
+ query_weighted_mem_limit = wg_high_water_mark_limit;
} else {
query_weighted_mem_limit =
total_used_slot_count > 0
- ? (wg_high_water_mark_limit +
total_used_slot_count) *
- query_ctx->get_slot_count() /
total_used_slot_count
+ ? (int64_t)((wg_high_water_mark_limit +
total_used_slot_count) *
+ query_ctx->get_slot_count() *
1.0 /
+ total_used_slot_count)
: wg_high_water_mark_limit;
}
}
@@ -407,8 +410,6 @@ void WorkloadGroupMgr::handle_paused_queries() {
MemTableMemoryLimiter* memtable_limiter =
doris::ExecEnv::GetInstance()->memtable_memory_limiter();
// Not use memlimit, should use high water mark.
- int64_t wg_high_water_mark_limit =
- wg->memory_limit() * wg->spill_threshold_high_water_mark() /
100;
int64_t memtable_active_bytes = 0;
int64_t memtable_write_bytes = 0;
int64_t memtable_flush_bytes = 0;
@@ -418,21 +419,20 @@ void WorkloadGroupMgr::handle_paused_queries() {
// For example, streamload, it will not reserve many memory, but it
will occupy many memtable memory.
// TODO: 0.2 should be a workload group properties. For example, the
group is optimized for load,then the value
// should be larged, if the group is optimized for query, then the
value should be smaller.
- int64_t max_wg_memtable_bytes =
- std::max<int64_t>(100 * 1024 * 1024,
(int64_t)(wg_high_water_mark_limit * 0.2));
+ int64_t max_wg_memtable_bytes = wg->load_buffer_limit();
if (memtable_active_bytes + memtable_write_bytes +
memtable_flush_bytes >
max_wg_memtable_bytes) {
// There are many table in flush queue, just waiting them flush
finished.
- if (memtable_write_bytes + memtable_flush_bytes >
max_wg_memtable_bytes / 2) {
+ if (memtable_active_bytes < (int64_t)(max_wg_memtable_bytes *
0.8)) {
LOG_EVERY_T(INFO, 60)
<< wg->name() << " load memtable size is: " <<
memtable_active_bytes << ", "
<< memtable_write_bytes << ", " << memtable_flush_bytes
<< ", wait for flush finished to release more memory";
continue;
} else {
- // Flush 50% active bytes, it means flush some
memtables(currently written) to flush queue.
+ // Flush some memtables(currently written) to flush queue.
memtable_limiter->flush_workload_group_memtables(
- wg->id(), (int64_t)(memtable_active_bytes * 0.5));
+ wg->id(), memtable_active_bytes -
(int64_t)(max_wg_memtable_bytes * 0.8));
LOG_EVERY_T(INFO, 60)
<< wg->name() << " load memtable size is: " <<
memtable_active_bytes << ", "
<< memtable_write_bytes << ", " << memtable_flush_bytes
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index d98cd2a3664..988a0ec8626 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -208,8 +208,9 @@ void MemInfo::refresh_proc_meminfo() {
_s_cgroup_mem_usage = cgroup_mem_usage;
// wait 10s, 100 * 100ms, avoid too frequently.
_s_cgroup_mem_refresh_wait_times = -100;
- LOG(INFO) << "Refresh cgroup memory win, refresh again after 10s,
cgroup mem limit: "
- << _s_cgroup_mem_limit << ", cgroup mem usage: " <<
_s_cgroup_mem_usage;
+ LOG(INFO)
+ << "Refresh cgroup memory success, refresh again after
10s, cgroup mem limit: "
+ << _s_cgroup_mem_limit << ", cgroup mem usage: " <<
_s_cgroup_mem_usage;
} else {
// find cgroup failed, wait 300s, 1000 * 100ms.
_s_cgroup_mem_refresh_wait_times = -3000;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]