This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 059be6317dd add load buffer limit for load buffer management (#41115)
059be6317dd is described below

commit 059be6317dddefe338f4ce8cf2b05e31292c8175
Author: yiguolei <[email protected]>
AuthorDate: Mon Sep 23 12:54:21 2024 +0800

    add load buffer limit for load buffer management (#41115)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
    
    ---------
    
    Co-authored-by: yiguolei <[email protected]>
---
 be/src/agent/workload_group_listener.cpp           |  2 ++
 be/src/runtime/workload_group/workload_group.cpp   | 29 ++++++++++++++------
 be/src/runtime/workload_group/workload_group.h     |  6 ++++
 .../workload_group/workload_group_manager.cpp      | 32 +++++++++++-----------
 be/src/util/mem_info.cpp                           |  5 ++--
 5 files changed, 48 insertions(+), 26 deletions(-)

diff --git a/be/src/agent/workload_group_listener.cpp 
b/be/src/agent/workload_group_listener.cpp
index f0f57869f25..05e72c4038a 100644
--- a/be/src/agent/workload_group_listener.cpp
+++ b/be/src/agent/workload_group_listener.cpp
@@ -32,6 +32,8 @@ void WorkloadGroupListener::handle_topic_info(const 
std::vector<TopicInfo>& topi
         if (!topic_info.__isset.workload_group_info) {
             continue;
         }
+        LOG(INFO) << "Received publish workload group info request: "
+                  << apache::thrift::ThriftDebugString(topic_info).c_str();
         is_set_workload_group_info = true;
 
         // 1 parse topic info to group info
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index dac52e1def4..b4afe462696 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -67,7 +67,8 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info)
           _spill_low_watermark(tg_info.spill_low_watermark),
           _spill_high_watermark(tg_info.spill_high_watermark),
           _scan_bytes_per_second(tg_info.read_bytes_per_second),
-          _remote_scan_bytes_per_second(tg_info.remote_read_bytes_per_second) {
+          _remote_scan_bytes_per_second(tg_info.remote_read_bytes_per_second),
+          _total_query_slot_count(tg_info.total_query_slot_count) {
     std::vector<DataDirInfo>& data_dir_list = 
io::BeConfDataDirReader::be_config_data_dir_list;
     for (const auto& data_dir : data_dir_list) {
         _scan_io_throttle_map[data_dir.path] =
@@ -82,21 +83,33 @@ WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& 
tg_info)
             std::make_unique<bvar::Adder<size_t>>(_name, 
"total_local_read_bytes");
     _total_local_scan_io_per_second = 
std::make_unique<bvar::PerSecond<bvar::Adder<size_t>>>(
             _name, "total_local_read_bytes_per_second", 
_total_local_scan_io_adder.get(), 1);
+    _load_buffer_limit = (int64_t)(_memory_limit * 0.2);
 }
 
 std::string WorkloadGroup::debug_string() const {
     std::shared_lock<std::shared_mutex> rl {_mutex};
+    auto realtime_total_mem_used = _total_mem_used + 
_wg_refresh_interval_memory_growth.load();
+    auto mem_used_ratio = realtime_total_mem_used / 
((double)_weighted_memory_limit + 1);
     return fmt::format(
-            "TG[id = {}, name = {}, cpu_share = {}, memory_limit = {}, 
enable_memory_overcommit = "
-            "{}, version = {}, cpu_hard_limit = {}, scan_thread_num = "
+            "WorkloadGroup[id = {}, name = {}, version = {}, cpu_share = {}, "
+            "total_query_slot_count={}, "
+            "memory_limit = {}, "
+            "enable_memory_overcommit = {},  weighted_memory_limit = {}, 
total_mem_used = {},"
+            "wg_refresh_interval_memory_growth = {},  mem_used_ratio = {}, 
spill_low_watermark = "
+            "{}, spill_high_watermark = {},cpu_hard_limit = {}, 
scan_thread_num = "
             "{}, max_remote_scan_thread_num = {}, min_remote_scan_thread_num = 
{}, "
-            "spill_low_watermark={}, spill_high_watermark={}, is_shutdown={}, 
query_num={}, "
+            "is_shutdown={}, query_num={}, "
             "read_bytes_per_second={}, remote_read_bytes_per_second={}]",
-            _id, _name, cpu_share(), PrettyPrinter::print(_memory_limit, 
TUnit::BYTES),
-            _enable_memory_overcommit ? "true" : "false", _version, 
cpu_hard_limit(),
+            _id, _name, _version, cpu_share(), _total_query_slot_count,
+            PrettyPrinter::print(_memory_limit, TUnit::BYTES),
+            _enable_memory_overcommit ? "true" : "false",
+            PrettyPrinter::print(_weighted_memory_limit.load(), TUnit::BYTES),
+            PrettyPrinter::print(_total_mem_used.load(), TUnit::BYTES),
+            PrettyPrinter::print(_wg_refresh_interval_memory_growth.load(), 
TUnit::BYTES),
+            mem_used_ratio, _spill_low_watermark, _spill_high_watermark, 
cpu_hard_limit(),
             _scan_thread_num, _max_remote_scan_thread_num, 
_min_remote_scan_thread_num,
-            _spill_low_watermark, _spill_high_watermark, _is_shutdown, 
_query_ctxs.size(),
-            _scan_bytes_per_second, _remote_scan_bytes_per_second);
+            _is_shutdown, _query_ctxs.size(), _scan_bytes_per_second,
+            _remote_scan_bytes_per_second);
 }
 
 std::string WorkloadGroup::memory_debug_string() const {
diff --git a/be/src/runtime/workload_group/workload_group.h 
b/be/src/runtime/workload_group/workload_group.h
index abbb585692b..b00bbfe68b5 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -233,12 +233,18 @@ public:
     }
     int64_t get_remote_scan_bytes_per_second();
 
+    int64_t load_buffer_limit() { return _load_buffer_limit; }
+
 private:
     mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, 
_memory_limit
     const uint64_t _id;
     std::string _name;
     int64_t _version;
     int64_t _memory_limit; // bytes
+    // For example, load memtable, write to parquet.
+    // If the wg's memory reached high water mark, then the load buffer
+    // will be restricted to this limit.
+    int64_t _load_buffer_limit;
 
     // memory used by load memtable
     int64_t _active_mem_usage = 0;
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 9baf193d721..39e20db4fca 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -185,6 +185,8 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
     if (all_workload_groups_mem_usage < process_memory_usage) {
         int64_t public_memory = process_memory_usage - 
all_workload_groups_mem_usage;
         weighted_memory_limit_ratio = 1 - (double)public_memory / 
(double)process_memory_limit;
+        // Round the value from 1% to 100%.
+        weighted_memory_limit_ratio = std::floor(weighted_memory_limit_ratio * 
100) / 100;
     }
 
     std::string debug_msg = fmt::format(
@@ -204,9 +206,10 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
         bool is_high_wartermark = false;
         wg.second->check_mem_used(&is_low_wartermark, &is_high_wartermark);
         int64_t wg_high_water_mark_limit =
-                wg_mem_limit * wg.second->spill_threshold_high_water_mark() / 
100;
+                (int64_t)(wg_mem_limit * 
wg.second->spill_threshold_high_water_mark() * 1.0 / 100);
         int64_t weighted_high_water_mark_limit =
-                wg_weighted_mem_limit * 
wg.second->spill_threshold_high_water_mark() / 100;
+                (int64_t)(wg_weighted_mem_limit * 
wg.second->spill_threshold_high_water_mark() *
+                          1.0 / 100);
         std::string debug_msg;
         if (is_high_wartermark || is_low_wartermark) {
             debug_msg = fmt::format(
@@ -230,7 +233,6 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
                         PrettyPrinter::print(query_mem_tracker->limit(), 
TUnit::BYTES),
                         
PrettyPrinter::print(query_mem_tracker->peak_consumption(), TUnit::BYTES));
             }
-            continue;
         }
 
         // If the wg enable over commit memory, then it is no need to update 
query memlimit
@@ -263,20 +265,21 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() 
{
                             << " enabled hard limit, but the slot count < 1, 
could not take affect";
                 } else {
                     // If the query enable hard limit, then not use weighted 
info any more, just use the settings limit.
-                    query_weighted_mem_limit =
-                            (wg_high_water_mark_limit * 
query_ctx->get_slot_count()) /
-                            total_slot_count;
+                    query_weighted_mem_limit = 
(int64_t)((wg_high_water_mark_limit *
+                                                          
query_ctx->get_slot_count() * 1.0) /
+                                                         total_slot_count);
                 }
             } else {
                 // If low water mark is not reached, then use process memory 
limit as query memory limit.
                 // It means it will not take effect.
                 if (!is_low_wartermark) {
-                    query_weighted_mem_limit = process_memory_limit;
+                    query_weighted_mem_limit = wg_high_water_mark_limit;
                 } else {
                     query_weighted_mem_limit =
                             total_used_slot_count > 0
-                                    ? (wg_high_water_mark_limit + 
total_used_slot_count) *
-                                              query_ctx->get_slot_count() / 
total_used_slot_count
+                                    ? (int64_t)((wg_high_water_mark_limit + 
total_used_slot_count) *
+                                                query_ctx->get_slot_count() * 
1.0 /
+                                                total_used_slot_count)
                                     : wg_high_water_mark_limit;
                 }
             }
@@ -407,8 +410,6 @@ void WorkloadGroupMgr::handle_paused_queries() {
         MemTableMemoryLimiter* memtable_limiter =
                 doris::ExecEnv::GetInstance()->memtable_memory_limiter();
         // Not use memlimit, should use high water mark.
-        int64_t wg_high_water_mark_limit =
-                wg->memory_limit() * wg->spill_threshold_high_water_mark() / 
100;
         int64_t memtable_active_bytes = 0;
         int64_t memtable_write_bytes = 0;
         int64_t memtable_flush_bytes = 0;
@@ -418,21 +419,20 @@ void WorkloadGroupMgr::handle_paused_queries() {
         // For example, streamload, it will not reserve many memory, but it 
will occupy many memtable memory.
         // TODO: 0.2 should be a workload group properties. For example, the 
group is optimized for load,then the value
         // should be larged, if the group is optimized for query, then the 
value should be smaller.
-        int64_t max_wg_memtable_bytes =
-                std::max<int64_t>(100 * 1024 * 1024, 
(int64_t)(wg_high_water_mark_limit * 0.2));
+        int64_t max_wg_memtable_bytes = wg->load_buffer_limit();
         if (memtable_active_bytes + memtable_write_bytes + 
memtable_flush_bytes >
             max_wg_memtable_bytes) {
             // There are many table in flush queue, just waiting them flush 
finished.
-            if (memtable_write_bytes + memtable_flush_bytes > 
max_wg_memtable_bytes / 2) {
+            if (memtable_active_bytes < (int64_t)(max_wg_memtable_bytes * 
0.8)) {
                 LOG_EVERY_T(INFO, 60)
                         << wg->name() << " load memtable size is: " << 
memtable_active_bytes << ", "
                         << memtable_write_bytes << ", " << memtable_flush_bytes
                         << ", wait for flush finished to release more memory";
                 continue;
             } else {
-                // Flush 50% active bytes, it means flush some 
memtables(currently written) to flush queue.
+                // Flush some memtables(currently written) to flush queue.
                 memtable_limiter->flush_workload_group_memtables(
-                        wg->id(), (int64_t)(memtable_active_bytes * 0.5));
+                        wg->id(), memtable_active_bytes - 
(int64_t)(max_wg_memtable_bytes * 0.8));
                 LOG_EVERY_T(INFO, 60)
                         << wg->name() << " load memtable size is: " << 
memtable_active_bytes << ", "
                         << memtable_write_bytes << ", " << memtable_flush_bytes
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index d98cd2a3664..988a0ec8626 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -208,8 +208,9 @@ void MemInfo::refresh_proc_meminfo() {
             _s_cgroup_mem_usage = cgroup_mem_usage;
             // wait 10s, 100 * 100ms, avoid too frequently.
             _s_cgroup_mem_refresh_wait_times = -100;
-            LOG(INFO) << "Refresh cgroup memory win, refresh again after 10s, 
cgroup mem limit: "
-                      << _s_cgroup_mem_limit << ", cgroup mem usage: " << 
_s_cgroup_mem_usage;
+            LOG(INFO)
+                    << "Refresh cgroup memory success, refresh again after 
10s, cgroup mem limit: "
+                    << _s_cgroup_mem_limit << ", cgroup mem usage: " << 
_s_cgroup_mem_usage;
         } else {
             // find cgroup failed, wait 300s, 1000 * 100ms.
             _s_cgroup_mem_refresh_wait_times = -3000;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to