This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 4a4cfeeee2c8c783c6140ed5e40a5d61e4e31773
Author: yiguolei <[email protected]>
AuthorDate: Thu Sep 19 17:38:06 2024 +0800

    f
---
 be/src/olap/memtable_memory_limiter.cpp            | 62 +++++++++++++++++++++-
 be/src/olap/memtable_memory_limiter.h              |  3 +-
 be/src/runtime/workload_group/workload_group.h     | 13 +++++
 .../workload_group/workload_group_manager.cpp      |  1 +
 4 files changed, 76 insertions(+), 3 deletions(-)

diff --git a/be/src/olap/memtable_memory_limiter.cpp 
b/be/src/olap/memtable_memory_limiter.cpp
index 9b9ce19f895..9f8a01b597d 100644
--- a/be/src/olap/memtable_memory_limiter.cpp
+++ b/be/src/olap/memtable_memory_limiter.cpp
@@ -72,6 +72,10 @@ Status MemTableMemoryLimiter::init(int64_t 
process_mem_limit) {
 
 void MemTableMemoryLimiter::register_writer(std::weak_ptr<MemTableWriter> 
writer) {
     std::lock_guard<std::mutex> l(_lock);
+    _workload_groups.insert(
+            std::pair {writer->workload_group_id(),
+                       
ExecEnv::GetInstance()->workload_group_mgr()->get_task_group_by_id(
+                               writer->workload_group_id())});
     _writers.push_back(writer);
 }
 
@@ -231,27 +235,63 @@ void MemTableMemoryLimiter::refresh_mem_tracker() {
     }
 }
 
+struct WgMemUsage {
+    int64_t active_mem_usage = 0;
+    int64_t write_mem_usage = 0;
+    int64_t flush_mem_usage = 0;
+};
+
 void MemTableMemoryLimiter::_refresh_mem_tracker() {
     _flush_mem_usage = 0;
     _write_mem_usage = 0;
     _active_mem_usage = 0;
+    std::map<uint64_t, WgMemUsage> wg_mem_usages;
     _active_writers.clear();
     for (auto it = _writers.begin(); it != _writers.end();) {
         if (auto writer = it->lock()) {
+            if (wg_mem_usages.find(writer->workload_group_id()) == 
wg_mem_usages.end()) {
+                wg_mem_usages.insert({writer->workload_group_id(), {0, 0, 0}});
+            }
+            auto& wg_mem_usage = 
wg_mem_usages.find(writer->workload_group_id())->second;
+
             // The memtable is currently used by writer to insert blocks.
             auto active_usage = writer->active_memtable_mem_consumption();
+            wg_mem_usage.active_mem_usage += active_usage;
             _active_mem_usage += active_usage;
             if (active_usage > 0) {
                 _active_writers.push_back(writer);
             }
-            _flush_mem_usage += writer->mem_consumption(MemType::FLUSH);
-            _write_mem_usage += 
writer->mem_consumption(MemType::WRITE_FINISHED);
+
+            auto flush_usage = writer->mem_consumption(MemType::FLUSH);
+            wg_mem_usage.flush_mem_usage += flush_usage;
+            _flush_mem_usage += flush_usage;
+
+            auto write_usage = 
writer->mem_consumption(MemType::WRITE_FINISHED);
+            wg_mem_usage.write_mem_usage += write_usage;
+            _write_mem_usage += write_usage;
             ++it;
         } else {
             *it = std::move(_writers.back());
             _writers.pop_back();
         }
     }
+    std::vector<uint64_t> invalid_wgs;
+    for (auto it = _workload_groups.begin(); it != _workload_groups.end(); 
++it) {
+        auto wg = it->second.lock();
+        if (wg == nullptr) {
+            // wg is removed
+            invalid_wgs.push_back(it->first);
+            continue;
+        }
+        auto wg_mem_usage = wg_mem_usages.find(it->first);
+        if (wg_mem_usage == wg_mem_usages.end()) {
+            wg->update_load_mem_usage(0, 0, 0);
+        } else {
+            wg->update_load_mem_usage(wg_mem_usage->second.active_mem_usage,
+                                      wg_mem_usage->second.write_mem_usage,
+                                      wg_mem_usage->second.flush_mem_usage);
+        }
+    }
     _mem_usage = _flush_mem_usage + _write_mem_usage;
     g_memtable_active_memory.set_value(_active_mem_usage);
     g_memtable_write_memory.set_value(_write_mem_usage);
@@ -264,4 +304,22 @@ void MemTableMemoryLimiter::_refresh_mem_tracker() {
     }
 }
 
+void MemTableMemoryLimiter::get_workload_group_memtable_usage(uint64_t wg_id, 
int64_t* active_bytes,
+                                                              int64_t* 
write_bytes,
+                                                              int64_t* 
flush_bytes) {
+    *active_bytes = 0;
+    *write_bytes = 0;
+    *flush_bytes = 0;
+    std::unique_lock<std::mutex> l(_lock);
+    for (auto it = _writers.begin(); it != _writers.end(); ++it) {
+        if (auto writer = it->lock()) {
+            if (writer->wg_id() == wg_id) {
+                *active_bytes += writer->active_memtable_mem_consumption();
+                *write_bytes += 
writer->mem_consumption(MemType::WRITE_FINISHED);
+                *flush_bytes += writer->mem_consumption(MemType::FLUSH);
+            }
+        }
+    }
+}
+
 } // namespace doris
diff --git a/be/src/olap/memtable_memory_limiter.h 
b/be/src/olap/memtable_memory_limiter.h
index 7d5a781f1e4..c52eef7f9bd 100644
--- a/be/src/olap/memtable_memory_limiter.h
+++ b/be/src/olap/memtable_memory_limiter.h
@@ -41,7 +41,7 @@ public:
     // If yes, it will flush memtable to try to reduce memory consumption.
     void handle_memtable_flush();
 
-    void flush_workload_group_memtables(uint64_t wg_id, int64_t 
need_flush_bytes);
+    void flush_workload_group_memtables(uint64_t wg_id, int64_t 
need_flush_bytes) {}
 
     void register_writer(std::weak_ptr<MemTableWriter> writer);
 
@@ -85,5 +85,6 @@ private:
 
     std::vector<std::weak_ptr<MemTableWriter>> _writers;
     std::vector<std::weak_ptr<MemTableWriter>> _active_writers;
+    std::map<uint64_t, std::weak_ptr<WorkloadGroup>> _workload_groups;
 };
 } // namespace doris
diff --git a/be/src/runtime/workload_group/workload_group.h 
b/be/src/runtime/workload_group/workload_group.h
index fa7fc787870..16c3cac2a03 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -135,6 +135,13 @@ public:
                                 
_spill_high_watermark.load(std::memory_order_relaxed) / 100));
     }
 
+    void update_load_mem_usage(int64_t active_bytes, int64_t write_bytes, 
int64_t flush_bytes) {
+        std::unique_lock<std::shared_mutex> wlock(_mutex);
+        _active_mem_usage = active_bytes;
+        _write_mem_usage = write_bytes;
+        _flush_mem_usage = flush_bytes;
+    }
+
     std::string debug_string() const;
     std::string memory_debug_string() const;
 
@@ -225,6 +232,12 @@ private:
     std::string _name;
     int64_t _version;
     int64_t _memory_limit; // bytes
+
+    // memory used by load memtable
+    int64_t _active_mem_usage = 0;
+    int64_t _write_mem_usage = 0;
+    int64_t _flush_mem_usage = 0;
+
     // `weighted_memory_limit` less than or equal to _memory_limit, calculate 
after exclude public memory.
     // more detailed description in `refresh_wg_weighted_memory_limit`.
     std::atomic<int64_t> _weighted_memory_limit {0}; //
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 7d71097a519..23209867dee 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -429,6 +429,7 @@ void WorkloadGroupMgr::handle_paused_queries() {
                 continue;
             } else {
                 // Flush 50% active bytes, it means flush some 
memtables(currently written) to flush queue.
+                memtable_limiter->flush_workload_group_memtables(wg->wg_id, );
                 continue;
             }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to