This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
new 754d2d9f23 [cherrypick](memory) Pick memory GC refactor to avoid
frequent invalid GC (#17526)
754d2d9f23 is described below
commit 754d2d9f23df9d56d4eeb1939febcfab553a4b1d
Author: Xinyi Zou <[email protected]>
AuthorDate: Wed Mar 8 07:27:16 2023 +0800
[cherrypick](memory) Pick memory GC refactor to avoid frequent invalid GC
(#17526)
* [fix](memory) split mem usage thread and gc thread to different threads
(#17213)
Ensure that the memory status is refreshed in time
Avoid frequent GC
* [fix](memory) Avoid repeating meaningless memory gc #17258
---
be/src/common/config.h | 6 +-
be/src/common/daemon.cpp | 93 ++++++++++++++++++---------
be/src/common/daemon.h | 2 +
be/src/olap/lru_cache.cpp | 11 +++-
be/src/olap/lru_cache.h | 7 ++
be/src/olap/segment_loader.h | 6 ++
be/src/runtime/fragment_mgr.cpp | 16 +++++
be/src/runtime/fragment_mgr.h | 2 +
be/src/runtime/memory/mem_tracker_limiter.cpp | 17 ++++-
be/src/util/mem_info.cpp | 51 +++++++++++----
10 files changed, 161 insertions(+), 50 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 5f64f97f9b..63b9bdd779 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -511,7 +511,11 @@ CONF_String(buffer_pool_limit, "20%");
CONF_String(buffer_pool_clean_pages_limit, "50%");
// Sleep time in milliseconds between memory maintenance iterations
-CONF_mInt32(memory_maintenance_sleep_time_ms, "500");
+CONF_mInt32(memory_maintenance_sleep_time_ms, "100");
+
+// After full gc, no longer full gc and minor gc during sleep.
+// After minor gc, no minor gc during sleep, but full gc is possible.
+CONF_mInt32(memory_gc_sleep_time_s, "1");
// Sleep time in milliseconds between load channel memory refresh iterations
CONF_mInt64(load_channel_memory_refresh_sleep_time_ms, "100");
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 2bdcbd414b..01dbe847eb 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -204,8 +204,7 @@ void Daemon::buffer_pool_gc_thread() {
void Daemon::memory_maintenance_thread() {
int32_t interval_milliseconds = config::memory_maintenance_sleep_time_ms;
- int32_t cache_gc_interval_ms = config::cache_gc_interval_s * 1000;
- int64_t cache_gc_freed_mem = 0;
+ int64_t last_print_proc_mem = PerfCounters::get_vm_rss();
while (!_stop_background_threads_latch.wait_for(
std::chrono::milliseconds(interval_milliseconds))) {
if (!MemInfo::initialized() || !ExecEnv::GetInstance()->initialized())
{
@@ -214,56 +213,81 @@ void Daemon::memory_maintenance_thread() {
// Refresh process memory metrics.
doris::PerfCounters::refresh_proc_status();
doris::MemInfo::refresh_proc_meminfo();
+ doris::MemInfo::refresh_proc_mem_no_allocator_cache();
+
+ // Update and print memory stat when the memory changes by 100M.
+ if (abs(last_print_proc_mem - PerfCounters::get_vm_rss()) > 104857600)
{
+ last_print_proc_mem = PerfCounters::get_vm_rss();
+ doris::MemTrackerLimiter::enable_print_log_process_usage();
- // Refresh allocator memory metrics.
+ // Refresh mem tracker each type counter.
+ doris::MemTrackerLimiter::refresh_global_counter();
+
+ // Refresh allocator memory metrics.
#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) &&
!defined(THREAD_SANITIZER)
- doris::MemInfo::refresh_allocator_mem();
- if (config::enable_system_metrics) {
-
DorisMetrics::instance()->system_metrics()->update_allocator_metrics();
- }
+ doris::MemInfo::refresh_allocator_mem();
+ if (config::enable_system_metrics) {
+
DorisMetrics::instance()->system_metrics()->update_allocator_metrics();
+ }
#endif
- doris::MemInfo::refresh_proc_mem_no_allocator_cache();
-
- // Refresh mem tracker each type metrics.
- doris::MemTrackerLimiter::refresh_global_counter();
+ if (doris::config::memory_debug) {
+ LOG(INFO) << MemTrackerLimiter::process_mem_log_str();
+ LOG_EVERY_N(INFO, 10)
+ <<
doris::MemTrackerLimiter::log_process_usage_str("memory debug", false);
+ }
+ }
+ }
+}
- // If system available memory is not enough, or the process memory
exceeds the limit, reduce refresh interval.
- if (doris::MemInfo::sys_mem_available() <
- doris::MemInfo::sys_mem_available_low_water_mark() ||
- doris::MemInfo::proc_mem_no_allocator_cache() >=
doris::MemInfo::mem_limit()) {
+void Daemon::memory_gc_thread() {
+ int32_t interval_milliseconds = config::memory_maintenance_sleep_time_ms;
+ int32_t cache_gc_interval_ms = config::cache_gc_interval_s * 1000;
+ int32_t memory_minor_gc_sleep_time_ms = 0;
+ int32_t memory_full_gc_sleep_time_ms = 0;
+ int64_t cache_gc_freed_mem = 0;
+ while (!_stop_background_threads_latch.wait_for(
+ std::chrono::milliseconds(interval_milliseconds))) {
+ if (!MemInfo::initialized()) {
+ continue;
+ }
+ if (memory_full_gc_sleep_time_ms <= 0 &&
+ (doris::MemInfo::sys_mem_available() <
+ doris::MemInfo::sys_mem_available_low_water_mark() ||
+ doris::MemInfo::proc_mem_no_allocator_cache() >=
doris::MemInfo::mem_limit())) {
+ // No longer full gc and minor gc during sleep.
+ memory_full_gc_sleep_time_ms = config::memory_gc_sleep_time_s *
1000;
+ memory_minor_gc_sleep_time_ms = config::memory_gc_sleep_time_s *
1000;
+ cache_gc_interval_ms = config::cache_gc_interval_s * 1000;
doris::MemTrackerLimiter::print_log_process_usage("process full
gc", false);
- interval_milliseconds = std::min(100,
config::memory_maintenance_sleep_time_ms);
if (doris::MemInfo::process_full_gc()) {
// If there is not enough memory to be gc, the process memory
usage will not be printed in the next continuous gc.
doris::MemTrackerLimiter::enable_print_log_process_usage();
}
+ } else if (memory_minor_gc_sleep_time_ms <= 0 &&
+ (doris::MemInfo::sys_mem_available() <
+
doris::MemInfo::sys_mem_available_warning_water_mark() ||
+ doris::MemInfo::proc_mem_no_allocator_cache() >=
+ doris::MemInfo::soft_mem_limit())) {
+ // No minor gc during sleep, but full gc is possible.
+ memory_minor_gc_sleep_time_ms = config::memory_gc_sleep_time_s *
1000;
cache_gc_interval_ms = config::cache_gc_interval_s * 1000;
- } else if (doris::MemInfo::sys_mem_available() <
-
doris::MemInfo::sys_mem_available_warning_water_mark() ||
- doris::MemInfo::proc_mem_no_allocator_cache() >=
- doris::MemInfo::soft_mem_limit()) {
doris::MemTrackerLimiter::print_log_process_usage("process minor
gc", false);
- interval_milliseconds = std::min(200,
config::memory_maintenance_sleep_time_ms);
if (doris::MemInfo::process_minor_gc()) {
doris::MemTrackerLimiter::enable_print_log_process_usage();
}
- cache_gc_interval_ms = config::cache_gc_interval_s * 1000;
} else {
- doris::MemTrackerLimiter::enable_print_log_process_usage();
- interval_milliseconds = config::memory_maintenance_sleep_time_ms;
- if (doris::config::memory_debug) {
- LOG_EVERY_N(WARNING, 20) <<
doris::MemTrackerLimiter::log_process_usage_str(
- "memory debug", false); // default 10s print once
- } else {
- LOG_EVERY_N(INFO, 10)
- << MemTrackerLimiter::process_mem_log_str(); //
default 5s print once
+ if (memory_full_gc_sleep_time_ms > 0) {
+ memory_full_gc_sleep_time_ms -= interval_milliseconds;
+ }
+ if (memory_minor_gc_sleep_time_ms > 0) {
+ memory_minor_gc_sleep_time_ms -= interval_milliseconds;
}
cache_gc_interval_ms -= interval_milliseconds;
if (cache_gc_interval_ms < 0) {
cache_gc_freed_mem = 0;
doris::MemInfo::process_cache_gc(cache_gc_freed_mem);
LOG(INFO) << fmt::format("Process regular GC Cache, Free
Memory {} Bytes",
- cache_gc_freed_mem); // default 6s
print once
+ cache_gc_freed_mem);
cache_gc_interval_ms = config::cache_gc_interval_s * 1000;
}
}
@@ -464,6 +488,10 @@ void Daemon::start() {
"Daemon", "memory_maintenance_thread", [this]() {
this->memory_maintenance_thread(); },
&_memory_maintenance_thread);
CHECK(st.ok()) << st;
+ st = Thread::create(
+ "Daemon", "memory_gc_thread", [this]() { this->memory_gc_thread();
},
+ &_memory_gc_thread);
+ CHECK(st.ok()) << st;
st = Thread::create(
"Daemon", "load_channel_tracker_refresh_thread",
[this]() { this->load_channel_tracker_refresh_thread(); },
@@ -490,6 +518,9 @@ void Daemon::stop() {
if (_memory_maintenance_thread) {
_memory_maintenance_thread->join();
}
+ if (_memory_gc_thread) {
+ _memory_gc_thread->join();
+ }
if (_load_channel_tracker_refresh_thread) {
_load_channel_tracker_refresh_thread->join();
}
diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h
index 53cd925ef5..2dc4fd9bc7 100644
--- a/be/src/common/daemon.h
+++ b/be/src/common/daemon.h
@@ -48,6 +48,7 @@ private:
void tcmalloc_gc_thread();
void buffer_pool_gc_thread();
void memory_maintenance_thread();
+ void memory_gc_thread();
void load_channel_tracker_refresh_thread();
void calculate_metrics_thread();
@@ -56,6 +57,7 @@ private:
// only buffer pool gc, will be removed after.
scoped_refptr<Thread> _buffer_pool_gc_thread;
scoped_refptr<Thread> _memory_maintenance_thread;
+ scoped_refptr<Thread> _memory_gc_thread;
scoped_refptr<Thread> _load_channel_tracker_refresh_thread;
scoped_refptr<Thread> _calculate_metrics_thread;
};
diff --git a/be/src/olap/lru_cache.cpp b/be/src/olap/lru_cache.cpp
index 0eb6fe53bb..81d9803ada 100644
--- a/be/src/olap/lru_cache.cpp
+++ b/be/src/olap/lru_cache.cpp
@@ -442,7 +442,8 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name,
size_t total_capacity,
_num_shard_bits(Bits::FindLSBSetNonZero(num_shards)),
_num_shards(num_shards),
_shards(nullptr),
- _last_id(1) {
+ _last_id(1),
+ _total_capacity(total_capacity) {
_mem_tracker =
std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::GLOBAL, name);
CHECK(num_shards > 0) << "num_shards cannot be 0";
CHECK_EQ((num_shards & (num_shards - 1)), 0)
@@ -534,6 +535,14 @@ int64_t ShardedLRUCache::mem_consumption() {
return _mem_tracker->consumption();
}
+int64_t ShardedLRUCache::get_usage() {
+ size_t total_usage = 0;
+ for (int i = 0; i < _num_shards; i++) {
+ total_usage += _shards[i]->get_usage();
+ }
+ return total_usage;
+}
+
void ShardedLRUCache::update_cache_metrics() const {
size_t total_capacity = 0;
size_t total_usage = 0;
diff --git a/be/src/olap/lru_cache.h b/be/src/olap/lru_cache.h
index 8d86fa9b1b..7d55d859c9 100644
--- a/be/src/olap/lru_cache.h
+++ b/be/src/olap/lru_cache.h
@@ -219,6 +219,10 @@ public:
virtual int64_t mem_consumption() = 0;
+ virtual int64_t get_usage() = 0;
+
+ virtual size_t get_total_capacity() = 0;
+
private:
DISALLOW_COPY_AND_ASSIGN(Cache);
};
@@ -376,6 +380,8 @@ public:
virtual int64_t prune() override;
int64_t prune_if(CacheValuePredicate pred, bool lazy_mode = false)
override;
int64_t mem_consumption() override;
+ int64_t get_usage() override;
+ size_t get_total_capacity() override { return _total_capacity; };
private:
void update_cache_metrics() const;
@@ -391,6 +397,7 @@ private:
const uint32_t _num_shards;
LRUCache** _shards;
std::atomic<uint64_t> _last_id;
+ size_t _total_capacity;
std::unique_ptr<MemTrackerLimiter> _mem_tracker;
std::shared_ptr<MetricEntity> _entity = nullptr;
diff --git a/be/src/olap/segment_loader.h b/be/src/olap/segment_loader.h
index 8535e69281..03a54acf55 100644
--- a/be/src/olap/segment_loader.h
+++ b/be/src/olap/segment_loader.h
@@ -89,6 +89,12 @@ public:
Status prune();
int64_t prune_all() { return _cache->prune(); };
int64_t segment_cache_mem_consumption() { return
_cache->mem_consumption(); }
+ int64_t segment_cache_get_usage() { return _cache->get_usage(); }
+ double segment_cache_get_usage_ratio() {
+ return _cache->get_total_capacity() == 0
+ ? 0
+ : ((double)_cache->get_usage() /
_cache->get_total_capacity());
+ }
private:
SegmentLoader();
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 7d810555ae..a332583739 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -95,6 +95,8 @@ public:
Status execute();
Status cancel(const PPlanFragmentCancelReason& reason, const std::string&
msg = "");
+ bool is_canceled() { return _cancelled; }
+
TUniqueId fragment_instance_id() const { return _fragment_instance_id; }
TUniqueId query_id() const { return _query_id; }
@@ -822,6 +824,20 @@ void FragmentMgr::cancel_query(const TUniqueId& query_id,
const PPlanFragmentCan
}
}
+bool FragmentMgr::query_is_canceled(const TUniqueId& query_id) {
+ std::lock_guard<std::mutex> lock(_lock);
+ auto ctx = _fragments_ctx_map.find(query_id);
+ if (ctx != _fragments_ctx_map.end()) {
+ for (auto it : ctx->second->fragment_ids) {
+ auto exec_state_iter = _fragment_map.find(it);
+ if (exec_state_iter != _fragment_map.end() &&
exec_state_iter->second) {
+ return exec_state_iter->second->is_canceled();
+ }
+ }
+ }
+ return true;
+}
+
void FragmentMgr::cancel_worker() {
LOG(INFO) << "FragmentMgr cancel worker start working.";
do {
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index c5411eeed9..dc1567bdc2 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -81,6 +81,8 @@ public:
void cancel_query(const TUniqueId& query_id, const
PPlanFragmentCancelReason& reason,
const std::string& msg = "");
+ bool query_is_canceled(const TUniqueId& query_id);
+
void cancel_worker();
virtual void debug(std::stringstream& ss);
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 795aeccc94..3293c00914 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -295,6 +295,13 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t
min_free_mem,
std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock);
for (auto tracker : mem_tracker_limiter_pool[i].trackers) {
if (tracker->type() == type) {
+ if (tracker->consumption() <= 104857600) { // 100M small query
does not cancel
+ continue;
+ }
+ if (ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled(
+ label_to_queryid(tracker->label()))) {
+ continue;
+ }
if (tracker->consumption() > min_free_mem) {
std::priority_queue<std::pair<int64_t, std::string>,
std::vector<std::pair<int64_t,
std::string>>,
@@ -334,11 +341,15 @@ int64_t
MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem,
std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock);
for (auto tracker : mem_tracker_limiter_pool[i].trackers) {
if (tracker->type() == type) {
- int64_t overcommit_ratio =
- (static_cast<double>(tracker->consumption()) /
tracker->limit()) * 10000;
- if (overcommit_ratio == 0) { // Small query does not cancel
+ if (tracker->consumption() <= 104857600) { // 100M small query
does not cancel
+ continue;
+ }
+ if (ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled(
+ label_to_queryid(tracker->label()))) {
continue;
}
+ int64_t overcommit_ratio =
+ (static_cast<double>(tracker->consumption()) /
tracker->limit()) * 10000;
min_pq.push(std::pair<int64_t, std::string>(overcommit_ratio,
tracker->label()));
query_consumption[tracker->label()] = tracker->consumption();
}
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index 5fb42611ed..818223c1a0 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -93,30 +93,43 @@ void MemInfo::refresh_allocator_mem() {
void MemInfo::process_cache_gc(int64_t& freed_mem) {
// TODO, free more cache, and should free a certain percentage of
capacity, not all.
- freed_mem += ChunkAllocator::instance()->mem_consumption();
- ChunkAllocator::instance()->clear();
- freed_mem +=
-
StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE);
- StoragePageCache::instance()->prune(segment_v2::DATA_PAGE);
- // TODO add freed_mem
- SegmentLoader::instance()->prune();
+ int32_t min_free_size = 33554432; // 32M
+ if (ChunkAllocator::instance()->mem_consumption() > min_free_size) {
+ freed_mem += ChunkAllocator::instance()->mem_consumption();
+ ChunkAllocator::instance()->clear();
+ }
+
+ if
(StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE)
>
+ min_free_size) {
+ freed_mem +=
+
StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE);
+ StoragePageCache::instance()->prune(segment_v2::DATA_PAGE);
+ }
}
// step1: free all cache
// step2: free top overcommit query, if enable query memroy overcommit
+// TODO Now, the meaning is different from java minor gc + full gc, more like
small gc + large gc.
bool MemInfo::process_minor_gc() {
+ MonotonicStopWatch watch;
+ watch.start();
int64_t freed_mem = 0;
std::string vm_rss_str = PerfCounters::get_vm_rss_str();
std::string mem_available_str = MemInfo::sys_mem_available_str();
Defer defer {[&]() {
- LOG(INFO) << fmt::format("Process Minor GC Free Memory {} Bytes",
freed_mem);
+ LOG(INFO) << fmt::format("Process Minor GC Free Memory {} Bytes.
cost(us): {}", freed_mem,
+ watch.elapsed_time() / 1000);
}};
MemInfo::process_cache_gc(freed_mem);
if (freed_mem > _s_process_minor_gc_size) {
return true;
}
+
+ // TODO add freed_mem
+ SegmentLoader::instance()->prune();
+
if (config::enable_query_memroy_overcommit) {
freed_mem += MemTrackerLimiter::free_top_overcommit_query(
_s_process_minor_gc_size - freed_mem, vm_rss_str,
mem_available_str);
@@ -132,22 +145,32 @@ bool MemInfo::process_minor_gc() {
// step3: free top overcommit load, load retries are more expensive, So cancel
at the end.
// step4: free top memory load
bool MemInfo::process_full_gc() {
+ MonotonicStopWatch watch;
+ watch.start();
int64_t freed_mem = 0;
std::string vm_rss_str = PerfCounters::get_vm_rss_str();
std::string mem_available_str = MemInfo::sys_mem_available_str();
- Defer defer {
- [&]() { LOG(INFO) << fmt::format("Process Full GC Free Memory {}
Bytes", freed_mem); }};
+ Defer defer {[&]() {
+ LOG(INFO) << fmt::format("Process Full GC Free Memory {} Bytes.
cost(us): {}", freed_mem,
+ watch.elapsed_time() / 1000);
+ }};
MemInfo::process_cache_gc(freed_mem);
if (freed_mem > _s_process_full_gc_size) {
return true;
}
- freed_mem += SegmentLoader::instance()->segment_cache_mem_consumption();
- SegmentLoader::instance()->prune_all();
- if (freed_mem > _s_process_full_gc_size) {
- return true;
+
+ if (SegmentLoader::instance()->segment_cache_get_usage_ratio() > 0.1) {
+ freed_mem +=
SegmentLoader::instance()->segment_cache_mem_consumption();
+ LOG(INFO) << "prune all " <<
SegmentLoader::instance()->segment_cache_get_usage()
+ << " entries in segment cache.";
+ SegmentLoader::instance()->prune_all();
+ if (freed_mem > _s_process_full_gc_size) {
+ return true;
+ }
}
+
freed_mem +=
MemTrackerLimiter::free_top_memory_query(_s_process_full_gc_size - freed_mem,
vm_rss_str,
mem_available_str);
if (freed_mem > _s_process_full_gc_size) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]