This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 43c8027347f5623188bbe4cde0adbfb92eea09be
Author: yiguolei <[email protected]>
AuthorDate: Fri Sep 20 16:17:45 2024 +0800

    f
---
 be/src/olap/memtable_memory_limiter.cpp            | 41 +---------------------
 be/src/olap/memtable_memory_limiter.h              |  6 ++--
 be/src/runtime/workload_group/workload_group.h     | 13 +++++--
 .../workload_group/workload_group_manager.cpp      | 36 +++++++++++++++----
 .../workload_group/workload_group_manager.h        |  8 +++++
 5 files changed, 51 insertions(+), 53 deletions(-)

diff --git a/be/src/olap/memtable_memory_limiter.cpp 
b/be/src/olap/memtable_memory_limiter.cpp
index 9f8a01b597d..04a9d180700 100644
--- a/be/src/olap/memtable_memory_limiter.cpp
+++ b/be/src/olap/memtable_memory_limiter.cpp
@@ -72,10 +72,6 @@ Status MemTableMemoryLimiter::init(int64_t 
process_mem_limit) {
 
 void MemTableMemoryLimiter::register_writer(std::weak_ptr<MemTableWriter> 
writer) {
     std::lock_guard<std::mutex> l(_lock);
-    _workload_groups.insert(
-            std::pair {writer->workload_group_id(),
-                       
ExecEnv::GetInstance()->workload_group_mgr()->get_task_group_by_id(
-                               writer->workload_group_id())});
     _writers.push_back(writer);
 }
 
@@ -235,7 +231,7 @@ void MemTableMemoryLimiter::refresh_mem_tracker() {
     }
 }
 
-struct WgMemUsage {
+struct MemtableUsage {
     int64_t active_mem_usage = 0;
     int64_t write_mem_usage = 0;
     int64_t flush_mem_usage = 0;
@@ -275,23 +271,6 @@ void MemTableMemoryLimiter::_refresh_mem_tracker() {
             _writers.pop_back();
         }
     }
-    std::vector<uint64_t> invalid_wgs;
-    for (auto it = _workload_groups.begin(); it != _workload_groups.end(); 
++it) {
-        auto wg = it->second.lock();
-        if (wg == nullptr) {
-            // wg is removed
-            invalid_wgs.push_back(it->first);
-            continue;
-        }
-        auto wg_mem_usage = wg_mem_usages.find(it->first);
-        if (wg_mem_usage == wg_mem_usages.end()) {
-            wg->update_load_mem_usage(0, 0, 0);
-        } else {
-            wg->update_load_mem_usage(wg_mem_usage->second.active_mem_usage,
-                                      wg_mem_usage->second.write_mem_usage,
-                                      wg_mem_usage->second.flush_mem_usage);
-        }
-    }
     _mem_usage = _flush_mem_usage + _write_mem_usage;
     g_memtable_active_memory.set_value(_active_mem_usage);
     g_memtable_write_memory.set_value(_write_mem_usage);
@@ -304,22 +283,4 @@ void MemTableMemoryLimiter::_refresh_mem_tracker() {
     }
 }
 
-void MemTableMemoryLimiter::get_workload_group_memtable_usage(uint64_t wg_id, 
int64_t* active_bytes,
-                                                              int64_t* 
write_bytes,
-                                                              int64_t* 
flush_bytes) {
-    *active_bytes = 0;
-    *write_bytes = 0;
-    *flush_bytes = 0;
-    std::unique_lock<std::mutex> l(_lock);
-    for (auto it = _writers.begin(); it != _writers.end(); ++it) {
-        if (auto writer = it->lock()) {
-            if (writer->wg_id() == wg_id) {
-                *active_bytes += writer->active_memtable_mem_consumption();
-                *write_bytes += 
writer->mem_consumption(MemType::WRITE_FINISHED);
-                *flush_bytes += writer->mem_consumption(MemType::FLUSH);
-            }
-        }
-    }
-}
-
 } // namespace doris
diff --git a/be/src/olap/memtable_memory_limiter.h 
b/be/src/olap/memtable_memory_limiter.h
index c52eef7f9bd..007eda4506e 100644
--- a/be/src/olap/memtable_memory_limiter.h
+++ b/be/src/olap/memtable_memory_limiter.h
@@ -39,6 +39,8 @@ public:
 
     // check if the total mem consumption exceeds limit.
     // If yes, it will flush memtable to try to reduce memory consumption.
+    // Every write operation will call this API to check if need flush 
memtable OR hang
+    // when memory is not available.
     void handle_memtable_flush();
 
     void flush_workload_group_memtables(uint64_t wg_id, int64_t 
need_flush_bytes) {}
@@ -51,9 +53,6 @@ public:
 
     int64_t mem_usage() const { return _mem_usage; }
 
-    void get_workload_group_memtable_usage(uint64_t wg_id, int64_t* 
active_bytes,
-                                           int64_t* write_bytes, int64_t* 
flush_bytes);
-
 private:
     static inline bool _sys_avail_mem_less_than_warning_water_mark();
     static inline bool _process_used_mem_more_than_soft_mem_limit();
@@ -85,6 +84,5 @@ private:
 
     std::vector<std::weak_ptr<MemTableWriter>> _writers;
     std::vector<std::weak_ptr<MemTableWriter>> _active_writers;
-    std::map<uint64_t, std::weak_ptr<WorkloadGroup>> _workload_groups;
 };
 } // namespace doris
diff --git a/be/src/runtime/workload_group/workload_group.h 
b/be/src/runtime/workload_group/workload_group.h
index 16c3cac2a03..f8ac9bb3a59 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -137,9 +137,16 @@ public:
 
     void update_load_mem_usage(int64_t active_bytes, int64_t write_bytes, 
int64_t flush_bytes) {
         std::unique_lock<std::shared_mutex> wlock(_mutex);
-        _active_mem_usage = active_bytes;
-        _write_mem_usage = write_bytes;
-        _flush_mem_usage = flush_bytes;
+        _active_mem_usage = _active_mem_usage;
+        _write_mem_usage = _write_mem_usage;
+        _flush_mem_usage = _flush_mem_usage;
+    }
+
+    void get_load_mem_usage(int64_t* active_bytes, int64_t* write_bytes, 
int64_t* flush_bytes) {
+        std::shared_lock<std::shared_mutex> r_lock(_mutex);
+        *active_bytes += writer->active_memtable_mem_consumption();
+        *write_bytes += writer->mem_consumption(MemType::WRITE_FINISHED);
+        *flush_bytes += writer->mem_consumption(MemType::FLUSH);
     }
 
     std::string debug_string() const;
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 23209867dee..4a277ae40f1 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -412,10 +412,12 @@ void WorkloadGroupMgr::handle_paused_queries() {
         int64_t memtable_active_bytes = 0;
         int64_t memtable_write_bytes = 0;
         int64_t memtable_flush_bytes = 0;
-        memtable_limiter->get_workload_group_memtable_usage(
-                wg->id(), &memtable_active_bytes, &memtable_write_bytes, 
&memtable_flush_bytes);
+        wg->get_load_mem_usage(&memtable_active_bytes, &memtable_write_bytes,
+                               &memtable_flush_bytes);
         // TODO: should add a signal in memtable limiter to prevent new batch
         // For example, streamload, it will not reserve many memory, but it 
will occupy many memtable memory.
+        // TODO: 0.2 should be a workload group properties. For example, the 
group is optimized for load,then the value
+        // should be larged, if the group is optimized for query, then the 
value should be smaller.
         int64_t max_wg_memtable_bytes =
                 std::max<int64_t>(100 * 1024 * 1024, wg_high_water_mark_limit 
* 0.2);
         if (memtable_active_bytes + memtable_write_bytes + 
memtable_flush_bytes >
@@ -429,15 +431,21 @@ void WorkloadGroupMgr::handle_paused_queries() {
                 continue;
             } else {
                 // Flush 50% active bytes, it means flush some 
memtables(currently written) to flush queue.
-                memtable_limiter->flush_workload_group_memtables(wg->wg_id, );
+                memtable_limiter->flush_workload_group_memtables(wg->wg_id(),
+                                                                 
memtable_active_bytes * 0.5);
+                LOG_EVERY_T(INFO, 60)
+                        << wg->name() << " load memtable size is: " << 
memtable_active_bytes << ", "
+                        << memtable_write_bytes << ", " << memtable_flush_bytes
+                        << ", flush half of active memtable to revoke memory";
                 continue;
             }
         }
 
         // If the wg enable memory overcommit, then not spill, just cancel 
query.
-        if (wg->enable_memory_overcommit()) {
-            continue;
-        }
+        // if (wg->enable_memory_overcommit()) {
+        // TODO  should cancel top query here.
+        //    continue;
+        //}
 
         std::shared_ptr<QueryContext> max_revocable_query;
         std::shared_ptr<QueryContext> max_memory_usage_query;
@@ -542,4 +550,20 @@ void WorkloadGroupMgr::stop() {
     }
 }
 
+void WorkloadGroupMgr::update_load_memtable_usage(
+        const std::map<uint64_t, MemtableUsage>& wg_memtable_usages) {
+    // Use readlock here, because it will not modify workload_groups
+    std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+    for (auto it = _workload_groups.begin(); it != _workload_groups.end(); 
++it) {
+        auto wg_usage = wg_memtable_usages.find(it->first);
+        if (wg_usage != wg_memtable_usages.end()) {
+            it->second->update_load_mem_usage(wg_usage->active_mem_usage, 
wg_usage->write_mem_usage,
+                                              wg_usage->flush_mem_usage);
+        } else {
+            // Not anything in memtable limiter, then set to 0
+            it->second->update_load_mem_usage(0, 0, 0);
+        }
+    }
+}
+
 } // namespace doris
diff --git a/be/src/runtime/workload_group/workload_group_manager.h 
b/be/src/runtime/workload_group/workload_group_manager.h
index 68d4f932de0..9d8b0744409 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -62,6 +62,12 @@ private:
     std::string query_id_;
 };
 
+struct MemtableUsage {
+    int64_t active_mem_usage = 0;
+    int64_t write_mem_usage = 0;
+    int64_t flush_mem_usage = 0;
+};
+
 class WorkloadGroupMgr {
 public:
     WorkloadGroupMgr() = default;
@@ -92,6 +98,8 @@ public:
 
     void handle_paused_queries();
 
+    void update_load_memtable_usage(const std::map<uint64_t, MemtableUsage>& 
wg_memtable_usages);
+
 private:
     std::shared_mutex _group_mutex;
     std::unordered_map<uint64_t, WorkloadGroupPtr> _workload_groups;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to