This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
commit 43c8027347f5623188bbe4cde0adbfb92eea09be Author: yiguolei <[email protected]> AuthorDate: Fri Sep 20 16:17:45 2024 +0800 f --- be/src/olap/memtable_memory_limiter.cpp | 41 +--------------------- be/src/olap/memtable_memory_limiter.h | 6 ++-- be/src/runtime/workload_group/workload_group.h | 13 +++++-- .../workload_group/workload_group_manager.cpp | 36 +++++++++++++++---- .../workload_group/workload_group_manager.h | 8 +++++ 5 files changed, 51 insertions(+), 53 deletions(-) diff --git a/be/src/olap/memtable_memory_limiter.cpp b/be/src/olap/memtable_memory_limiter.cpp index 9f8a01b597d..04a9d180700 100644 --- a/be/src/olap/memtable_memory_limiter.cpp +++ b/be/src/olap/memtable_memory_limiter.cpp @@ -72,10 +72,6 @@ Status MemTableMemoryLimiter::init(int64_t process_mem_limit) { void MemTableMemoryLimiter::register_writer(std::weak_ptr<MemTableWriter> writer) { std::lock_guard<std::mutex> l(_lock); - _workload_groups.insert( - std::pair {writer->workload_group_id(), - ExecEnv::GetInstance()->workload_group_mgr()->get_task_group_by_id( - writer->workload_group_id())}); _writers.push_back(writer); } @@ -235,7 +231,7 @@ void MemTableMemoryLimiter::refresh_mem_tracker() { } } -struct WgMemUsage { +struct MemtableUsage { int64_t active_mem_usage = 0; int64_t write_mem_usage = 0; int64_t flush_mem_usage = 0; @@ -275,23 +271,6 @@ void MemTableMemoryLimiter::_refresh_mem_tracker() { _writers.pop_back(); } } - std::vector<uint64_t> invalid_wgs; - for (auto it = _workload_groups.begin(); it != _workload_groups.end(); ++it) { - auto wg = it->second.lock(); - if (wg == nullptr) { - // wg is removed - invalid_wgs.push_back(it->first); - continue; - } - auto wg_mem_usage = wg_mem_usages.find(it->first); - if (wg_mem_usage == wg_mem_usages.end()) { - wg->update_load_mem_usage(0, 0, 0); - } else { - wg->update_load_mem_usage(wg_mem_usage->second.active_mem_usage, - wg_mem_usage->second.write_mem_usage, - wg_mem_usage->second.flush_mem_usage); - } - } _mem_usage = _flush_mem_usage + _write_mem_usage; g_memtable_active_memory.set_value(_active_mem_usage); g_memtable_write_memory.set_value(_write_mem_usage); @@ -304,22 +283,4 @@ void MemTableMemoryLimiter::_refresh_mem_tracker() { } } -void MemTableMemoryLimiter::get_workload_group_memtable_usage(uint64_t wg_id, int64_t* active_bytes, - int64_t* write_bytes, - int64_t* flush_bytes) { - *active_bytes = 0; - *write_bytes = 0; - *flush_bytes = 0; - std::unique_lock<std::mutex> l(_lock); - for (auto it = _writers.begin(); it != _writers.end(); ++it) { - if (auto writer = it->lock()) { - if (writer->wg_id() == wg_id) { - *active_bytes += writer->active_memtable_mem_consumption(); - *write_bytes += writer->mem_consumption(MemType::WRITE_FINISHED); - *flush_bytes += writer->mem_consumption(MemType::FLUSH); - } - } - } -} - } // namespace doris diff --git a/be/src/olap/memtable_memory_limiter.h b/be/src/olap/memtable_memory_limiter.h index c52eef7f9bd..007eda4506e 100644 --- a/be/src/olap/memtable_memory_limiter.h +++ b/be/src/olap/memtable_memory_limiter.h @@ -39,6 +39,8 @@ public: // check if the total mem consumption exceeds limit. // If yes, it will flush memtable to try to reduce memory consumption. + // Every write operation will call this API to check if need flush memtable OR hang + // when memory is not available. void handle_memtable_flush(); void flush_workload_group_memtables(uint64_t wg_id, int64_t need_flush_bytes) {} @@ -51,9 +53,6 @@ public: int64_t mem_usage() const { return _mem_usage; } - void get_workload_group_memtable_usage(uint64_t wg_id, int64_t* active_bytes, - int64_t* write_bytes, int64_t* flush_bytes); - private: static inline bool _sys_avail_mem_less_than_warning_water_mark(); static inline bool _process_used_mem_more_than_soft_mem_limit(); @@ -85,6 +84,5 @@ private: std::vector<std::weak_ptr<MemTableWriter>> _writers; std::vector<std::weak_ptr<MemTableWriter>> _active_writers; - std::map<uint64_t, std::weak_ptr<WorkloadGroup>> _workload_groups; }; } // namespace doris diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index 16c3cac2a03..f8ac9bb3a59 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -137,9 +137,16 @@ public: void update_load_mem_usage(int64_t active_bytes, int64_t write_bytes, int64_t flush_bytes) { std::unique_lock<std::shared_mutex> wlock(_mutex); - _active_mem_usage = active_bytes; - _write_mem_usage = write_bytes; - _flush_mem_usage = flush_bytes; + _active_mem_usage = _active_mem_usage; + _write_mem_usage = _write_mem_usage; + _flush_mem_usage = _flush_mem_usage; + } + + void get_load_mem_usage(int64_t* active_bytes, int64_t* write_bytes, int64_t* flush_bytes) { + std::shared_lock<std::shared_mutex> r_lock(_mutex); + *active_bytes += writer->active_memtable_mem_consumption(); + *write_bytes += writer->mem_consumption(MemType::WRITE_FINISHED); + *flush_bytes += writer->mem_consumption(MemType::FLUSH); } std::string debug_string() const; diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 23209867dee..4a277ae40f1 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -412,10 +412,12 @@ void WorkloadGroupMgr::handle_paused_queries() { int64_t memtable_active_bytes = 0; int64_t memtable_write_bytes = 0; int64_t memtable_flush_bytes = 0; - memtable_limiter->get_workload_group_memtable_usage( - wg->id(), &memtable_active_bytes, &memtable_write_bytes, &memtable_flush_bytes); + wg->get_load_mem_usage(&memtable_active_bytes, &memtable_write_bytes, + &memtable_flush_bytes); // TODO: should add a signal in memtable limiter to prevent new batch // For example, streamload, it will not reserve many memory, but it will occupy many memtable memory. + // TODO: 0.2 should be a workload group properties. For example, the group is optimized for load,then the value + // should be larged, if the group is optimized for query, then the value should be smaller. int64_t max_wg_memtable_bytes = std::max<int64_t>(100 * 1024 * 1024, wg_high_water_mark_limit * 0.2); if (memtable_active_bytes + memtable_write_bytes + memtable_flush_bytes > @@ -429,15 +431,21 @@ void WorkloadGroupMgr::handle_paused_queries() { continue; } else { // Flush 50% active bytes, it means flush some memtables(currently written) to flush queue. - memtable_limiter->flush_workload_group_memtables(wg->wg_id, ); + memtable_limiter->flush_workload_group_memtables(wg->wg_id(), + memtable_active_bytes * 0.5); + LOG_EVERY_T(INFO, 60) + << wg->name() << " load memtable size is: " << memtable_active_bytes << ", " + << memtable_write_bytes << ", " << memtable_flush_bytes + << ", flush half of active memtable to revoke memory"; continue; } } // If the wg enable memory overcommit, then not spill, just cancel query. - if (wg->enable_memory_overcommit()) { - continue; - } + // if (wg->enable_memory_overcommit()) { + // TODO should cancel top query here. + // continue; + //} std::shared_ptr<QueryContext> max_revocable_query; std::shared_ptr<QueryContext> max_memory_usage_query; @@ -542,4 +550,20 @@ void WorkloadGroupMgr::stop() { } } +void WorkloadGroupMgr::update_load_memtable_usage( + const std::map<uint64_t, MemtableUsage>& wg_memtable_usages) { + // Use readlock here, because it will not modify workload_groups + std::shared_lock<std::shared_mutex> r_lock(_group_mutex); + for (auto it = _workload_groups.begin(); it != _workload_groups.end(); ++it) { + auto wg_usage = wg_memtable_usages.find(it->first); + if (wg_usage != wg_memtable_usages.end()) { + it->second->update_load_mem_usage(wg_usage->active_mem_usage, wg_usage->write_mem_usage, + wg_usage->flush_mem_usage); + } else { + // Not anything in memtable limiter, then set to 0 + it->second->update_load_mem_usage(0, 0, 0); + } + } +} + } // namespace doris diff --git a/be/src/runtime/workload_group/workload_group_manager.h b/be/src/runtime/workload_group/workload_group_manager.h index 68d4f932de0..9d8b0744409 100644 --- a/be/src/runtime/workload_group/workload_group_manager.h +++ b/be/src/runtime/workload_group/workload_group_manager.h @@ -62,6 +62,12 @@ private: std::string query_id_; }; +struct MemtableUsage { + int64_t active_mem_usage = 0; + int64_t write_mem_usage = 0; + int64_t flush_mem_usage = 0; +}; + class WorkloadGroupMgr { public: WorkloadGroupMgr() = default; @@ -92,6 +98,8 @@ public: void handle_paused_queries(); + void update_load_memtable_usage(const std::map<uint64_t, MemtableUsage>& wg_memtable_usages); + private: std::shared_mutex _group_mutex; std::unordered_map<uint64_t, WorkloadGroupPtr> _workload_groups; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
