This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git

commit edfea8e44544a9b309ec142618e0e867dadd99a8
Author: yiguolei <[email protected]>
AuthorDate: Thu Sep 19 16:46:30 2024 +0800

    flush memtable before spill disk
---
 be/src/olap/memtable_memory_limiter.h              |  8 ++++++-
 be/src/olap/memtable_writer.h                      |  8 +++++++
 .../workload_group/workload_group_manager.cpp      | 28 ++++++++++++++++++++++
 3 files changed, 43 insertions(+), 1 deletion(-)

diff --git a/be/src/olap/memtable_memory_limiter.h 
b/be/src/olap/memtable_memory_limiter.h
index 66f5fb2a8d0..7d5a781f1e4 100644
--- a/be/src/olap/memtable_memory_limiter.h
+++ b/be/src/olap/memtable_memory_limiter.h
@@ -41,6 +41,8 @@ public:
     // If yes, it will flush memtable to try to reduce memory consumption.
     void handle_memtable_flush();
 
+    void flush_workload_group_memtables(uint64_t wg_id, int64_t 
need_flush_bytes);
+
     void register_writer(std::weak_ptr<MemTableWriter> writer);
 
     void refresh_mem_tracker();
@@ -49,6 +51,9 @@ public:
 
     int64_t mem_usage() const { return _mem_usage; }
 
+    void get_workload_group_memtable_usage(uint64_t wg_id, int64_t* 
active_bytes,
+                                           int64_t* write_bytes, int64_t* 
flush_bytes);
+
 private:
     static inline bool _sys_avail_mem_less_than_warning_water_mark();
     static inline bool _process_used_mem_more_than_soft_mem_limit();
@@ -56,7 +61,8 @@ private:
     bool _soft_limit_reached();
     bool _hard_limit_reached();
     bool _load_usage_low();
-    void _flush_active_memtables(int64_t need_flush);
+    // If wg_id == -1, it means not care about wgid
+    void _flush_active_memtables(uint64_t wg_id, int64_t need_flush);
     int64_t _flush_memtable(std::weak_ptr<MemTableWriter> writer_to_flush, 
int64_t threshold);
     void _refresh_mem_tracker();
 
diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h
index ec44348b4a9..ae06a04e86d 100644
--- a/be/src/olap/memtable_writer.h
+++ b/be/src/olap/memtable_writer.h
@@ -106,6 +106,14 @@ public:
 
     uint64_t flush_running_count() const;
 
+    uint64_t workload_group_id() const {
+        auto wg = _query_thread_context.wg_wptr.lock();
+        if (wg != nullptr) {
+            return wg->id();
+        }
+        return 0;
+    }
+
 private:
     // push a full memtable to flush executor
     Status _flush_memtable_async();
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 6fe8c7a51eb..7d71097a519 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -404,6 +404,34 @@ void WorkloadGroupMgr::handle_paused_queries() {
         }
 
         // 2. If memtable size larger than 10% of wg's limit, then flush 
memtable and wait.
+        MemTableMemoryLimiter* memtable_limiter =
+                doris::ExecEnv::GetInstance()->memtable_memory_limiter();
+        // Not use memlimit, should use high water mark.
+        int64_t wg_high_water_mark_limit =
+                wg->memory_limit() * wg->spill_threshold_high_water_mark() / 
100;
+        int64_t memtable_active_bytes = 0;
+        int64_t memtable_write_bytes = 0;
+        int64_t memtable_flush_bytes = 0;
+        memtable_limiter->get_workload_group_memtable_usage(
+                wg->id(), &memtable_active_bytes, &memtable_write_bytes, 
&memtable_flush_bytes);
+        // TODO: should add a signal in memtable limiter to prevent new batch
+        // For example, streamload, it will not reserve many memory, but it 
will occupy many memtable memory.
+        int64_t max_wg_memtable_bytes =
+                std::max<int64_t>(100 * 1024 * 1024, wg_high_water_mark_limit 
* 0.2);
+        if (memtable_active_bytes + memtable_write_bytes + 
memtable_flush_bytes >
+            max_wg_memtable_bytes) {
+            // There are many table in flush queue, just waiting them flush 
finished.
+            if (memtable_write_bytes + memtable_flush_bytes > 
max_wg_memtable_bytes / 2) {
+                LOG_EVERY_T(INFO, 60)
+                        << wg->name() << " load memtable size is: " << 
memtable_active_bytes << ", "
+                        << memtable_write_bytes << ", " << memtable_flush_bytes
+                        << ", wait for flush finished to release more memory";
+                continue;
+            } else {
+                // Flush 50% active bytes, it means flush some 
memtables(currently written) to flush queue.
+                continue;
+            }
+        }
 
         // If the wg enable memory overcommit, then not spill, just cancel 
query.
         if (wg->enable_memory_overcommit()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to