This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
commit edfea8e44544a9b309ec142618e0e867dadd99a8 Author: yiguolei <[email protected]> AuthorDate: Thu Sep 19 16:46:30 2024 +0800 flush memtable before spill disk --- be/src/olap/memtable_memory_limiter.h | 8 ++++++- be/src/olap/memtable_writer.h | 8 +++++++ .../workload_group/workload_group_manager.cpp | 28 ++++++++++++++++++++++ 3 files changed, 43 insertions(+), 1 deletion(-) diff --git a/be/src/olap/memtable_memory_limiter.h b/be/src/olap/memtable_memory_limiter.h index 66f5fb2a8d0..7d5a781f1e4 100644 --- a/be/src/olap/memtable_memory_limiter.h +++ b/be/src/olap/memtable_memory_limiter.h @@ -41,6 +41,8 @@ public: // If yes, it will flush memtable to try to reduce memory consumption. void handle_memtable_flush(); + void flush_workload_group_memtables(uint64_t wg_id, int64_t need_flush_bytes); + void register_writer(std::weak_ptr<MemTableWriter> writer); void refresh_mem_tracker(); @@ -49,6 +51,9 @@ public: int64_t mem_usage() const { return _mem_usage; } + void get_workload_group_memtable_usage(uint64_t wg_id, int64_t* active_bytes, + int64_t* write_bytes, int64_t* flush_bytes); + private: static inline bool _sys_avail_mem_less_than_warning_water_mark(); static inline bool _process_used_mem_more_than_soft_mem_limit(); @@ -56,7 +61,8 @@ private: bool _soft_limit_reached(); bool _hard_limit_reached(); bool _load_usage_low(); - void _flush_active_memtables(int64_t need_flush); + // If wg_id == -1, it means not care about wgid + void _flush_active_memtables(uint64_t wg_id, int64_t need_flush); int64_t _flush_memtable(std::weak_ptr<MemTableWriter> writer_to_flush, int64_t threshold); void _refresh_mem_tracker(); diff --git a/be/src/olap/memtable_writer.h b/be/src/olap/memtable_writer.h index ec44348b4a9..ae06a04e86d 100644 --- a/be/src/olap/memtable_writer.h +++ b/be/src/olap/memtable_writer.h @@ -106,6 +106,14 @@ public: uint64_t flush_running_count() const; + uint64_t workload_group_id() const { + auto wg = _query_thread_context.wg_wptr.lock(); + if (wg != nullptr) { + return wg->id(); + } + return 0; + } + private: // push a full memtable to flush executor Status _flush_memtable_async(); diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 6fe8c7a51eb..7d71097a519 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -404,6 +404,34 @@ void WorkloadGroupMgr::handle_paused_queries() { } // 2. If memtable size larger than 10% of wg's limit, then flush memtable and wait. + MemTableMemoryLimiter* memtable_limiter = + doris::ExecEnv::GetInstance()->memtable_memory_limiter(); + // Not use memlimit, should use high water mark. + int64_t wg_high_water_mark_limit = + wg->memory_limit() * wg->spill_threshold_high_water_mark() / 100; + int64_t memtable_active_bytes = 0; + int64_t memtable_write_bytes = 0; + int64_t memtable_flush_bytes = 0; + memtable_limiter->get_workload_group_memtable_usage( + wg->id(), &memtable_active_bytes, &memtable_write_bytes, &memtable_flush_bytes); + // TODO: should add a signal in memtable limiter to prevent new batch + // For example, streamload, it will not reserve many memory, but it will occupy many memtable memory. + int64_t max_wg_memtable_bytes = + std::max<int64_t>(100 * 1024 * 1024, wg_high_water_mark_limit * 0.2); + if (memtable_active_bytes + memtable_write_bytes + memtable_flush_bytes > + max_wg_memtable_bytes) { + // There are many table in flush queue, just waiting them flush finished. + if (memtable_write_bytes + memtable_flush_bytes > max_wg_memtable_bytes / 2) { + LOG_EVERY_T(INFO, 60) + << wg->name() << " load memtable size is: " << memtable_active_bytes << ", " + << memtable_write_bytes << ", " << memtable_flush_bytes + << ", wait for flush finished to release more memory"; + continue; + } else { + // Flush 50% active bytes, it means flush some memtables(currently written) to flush queue. + continue; + } + } // If the wg enable memory overcommit, then not spill, just cancel query. if (wg->enable_memory_overcommit()) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
