This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch spill_and_reserve in repository https://gitbox.apache.org/repos/asf/doris.git
commit 4a4cfeeee2c8c783c6140ed5e40a5d61e4e31773 Author: yiguolei <[email protected]> AuthorDate: Thu Sep 19 17:38:06 2024 +0800 f --- be/src/olap/memtable_memory_limiter.cpp | 62 +++++++++++++++++++++- be/src/olap/memtable_memory_limiter.h | 3 +- be/src/runtime/workload_group/workload_group.h | 13 +++++ .../workload_group/workload_group_manager.cpp | 1 + 4 files changed, 76 insertions(+), 3 deletions(-) diff --git a/be/src/olap/memtable_memory_limiter.cpp b/be/src/olap/memtable_memory_limiter.cpp index 9b9ce19f895..9f8a01b597d 100644 --- a/be/src/olap/memtable_memory_limiter.cpp +++ b/be/src/olap/memtable_memory_limiter.cpp @@ -72,6 +72,10 @@ Status MemTableMemoryLimiter::init(int64_t process_mem_limit) { void MemTableMemoryLimiter::register_writer(std::weak_ptr<MemTableWriter> writer) { std::lock_guard<std::mutex> l(_lock); + _workload_groups.insert( + std::pair {writer->workload_group_id(), + ExecEnv::GetInstance()->workload_group_mgr()->get_task_group_by_id( + writer->workload_group_id())}); _writers.push_back(writer); } @@ -231,27 +235,63 @@ void MemTableMemoryLimiter::refresh_mem_tracker() { } } +struct WgMemUsage { + int64_t active_mem_usage = 0; + int64_t write_mem_usage = 0; + int64_t flush_mem_usage = 0; +}; + void MemTableMemoryLimiter::_refresh_mem_tracker() { _flush_mem_usage = 0; _write_mem_usage = 0; _active_mem_usage = 0; + std::map<uint64_t, WgMemUsage> wg_mem_usages; _active_writers.clear(); for (auto it = _writers.begin(); it != _writers.end();) { if (auto writer = it->lock()) { + if (wg_mem_usages.find(writer->workload_group_id()) == wg_mem_usages.end()) { + wg_mem_usages.insert({writer->workload_group_id(), {0, 0, 0}}); + } + auto& wg_mem_usage = wg_mem_usages.find(writer->workload_group_id())->second; + // The memtable is currently used by writer to insert blocks. auto active_usage = writer->active_memtable_mem_consumption(); + wg_mem_usage.active_mem_usage += active_usage; _active_mem_usage += active_usage; if (active_usage > 0) { _active_writers.push_back(writer); } - _flush_mem_usage += writer->mem_consumption(MemType::FLUSH); - _write_mem_usage += writer->mem_consumption(MemType::WRITE_FINISHED); + + auto flush_usage = writer->mem_consumption(MemType::FLUSH); + wg_mem_usage.flush_mem_usage += flush_usage; + _flush_mem_usage += flush_usage; + + auto write_usage = writer->mem_consumption(MemType::WRITE_FINISHED); + wg_mem_usage.write_mem_usage += write_usage; + _write_mem_usage += write_usage; ++it; } else { *it = std::move(_writers.back()); _writers.pop_back(); } } + std::vector<uint64_t> invalid_wgs; + for (auto it = _workload_groups.begin(); it != _workload_groups.end(); ++it) { + auto wg = it->second.lock(); + if (wg == nullptr) { + // wg is removed + invalid_wgs.push_back(it->first); + continue; + } + auto wg_mem_usage = wg_mem_usages.find(it->first); + if (wg_mem_usage == wg_mem_usages.end()) { + wg->update_load_mem_usage(0, 0, 0); + } else { + wg->update_load_mem_usage(wg_mem_usage->second.active_mem_usage, + wg_mem_usage->second.write_mem_usage, + wg_mem_usage->second.flush_mem_usage); + } + } _mem_usage = _flush_mem_usage + _write_mem_usage; g_memtable_active_memory.set_value(_active_mem_usage); g_memtable_write_memory.set_value(_write_mem_usage); @@ -264,4 +304,22 @@ void MemTableMemoryLimiter::_refresh_mem_tracker() { } } +void MemTableMemoryLimiter::get_workload_group_memtable_usage(uint64_t wg_id, int64_t* active_bytes, + int64_t* write_bytes, + int64_t* flush_bytes) { + *active_bytes = 0; + *write_bytes = 0; + *flush_bytes = 0; + std::unique_lock<std::mutex> l(_lock); + for (auto it = _writers.begin(); it != _writers.end(); ++it) { + if (auto writer = it->lock()) { + if (writer->wg_id() == wg_id) { + *active_bytes += writer->active_memtable_mem_consumption(); + *write_bytes += writer->mem_consumption(MemType::WRITE_FINISHED); + *flush_bytes += writer->mem_consumption(MemType::FLUSH); + } + } + } +} + } // namespace doris diff --git a/be/src/olap/memtable_memory_limiter.h b/be/src/olap/memtable_memory_limiter.h index 7d5a781f1e4..c52eef7f9bd 100644 --- a/be/src/olap/memtable_memory_limiter.h +++ b/be/src/olap/memtable_memory_limiter.h @@ -41,7 +41,7 @@ public: // If yes, it will flush memtable to try to reduce memory consumption. void handle_memtable_flush(); - void flush_workload_group_memtables(uint64_t wg_id, int64_t need_flush_bytes); + void flush_workload_group_memtables(uint64_t wg_id, int64_t need_flush_bytes) {} void register_writer(std::weak_ptr<MemTableWriter> writer); @@ -85,5 +85,6 @@ private: std::vector<std::weak_ptr<MemTableWriter>> _writers; std::vector<std::weak_ptr<MemTableWriter>> _active_writers; + std::map<uint64_t, std::weak_ptr<WorkloadGroup>> _workload_groups; }; } // namespace doris diff --git a/be/src/runtime/workload_group/workload_group.h b/be/src/runtime/workload_group/workload_group.h index fa7fc787870..16c3cac2a03 100644 --- a/be/src/runtime/workload_group/workload_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -135,6 +135,13 @@ public: _spill_high_watermark.load(std::memory_order_relaxed) / 100)); } + void update_load_mem_usage(int64_t active_bytes, int64_t write_bytes, int64_t flush_bytes) { + std::unique_lock<std::shared_mutex> wlock(_mutex); + _active_mem_usage = active_bytes; + _write_mem_usage = write_bytes; + _flush_mem_usage = flush_bytes; + } + std::string debug_string() const; std::string memory_debug_string() const; @@ -225,6 +232,12 @@ private: std::string _name; int64_t _version; int64_t _memory_limit; // bytes + + // memory used by load memtable + int64_t _active_mem_usage = 0; + int64_t _write_mem_usage = 0; + int64_t _flush_mem_usage = 0; + // `weighted_memory_limit` less than or equal to _memory_limit, calculate after exclude public memory. // more detailed description in `refresh_wg_weighted_memory_limit`. std::atomic<int64_t> _weighted_memory_limit {0}; // diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp index 7d71097a519..23209867dee 100644 --- a/be/src/runtime/workload_group/workload_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -429,6 +429,7 @@ void WorkloadGroupMgr::handle_paused_queries() { continue; } else { // Flush 50% active bytes, it means flush some memtables(currently written) to flush queue. + memtable_limiter->flush_workload_group_memtables(wg->wg_id, ); continue; } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
