This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3871e989ac [fix](memory) Avoid repeating meaningless memory gc #17258
3871e989ac is described below
commit 3871e989acc3d6c28f9621169e4be8828060c428
Author: Xinyi Zou <[email protected]>
AuthorDate: Wed Mar 1 19:23:33 2023 +0800
[fix](memory) Avoid repeating meaningless memory gc #17258
---
be/src/olap/lru_cache.cpp | 11 +++++-
be/src/olap/lru_cache.h | 7 ++++
be/src/olap/segment_loader.h | 6 ++++
be/src/runtime/fragment_mgr.cpp | 21 +++++++++++
be/src/runtime/fragment_mgr.h | 2 ++
be/src/runtime/memory/mem_tracker_limiter.cpp | 17 +++++++--
be/src/util/mem_info.cpp | 50 +++++++++++++++++++--------
7 files changed, 96 insertions(+), 18 deletions(-)
diff --git a/be/src/olap/lru_cache.cpp b/be/src/olap/lru_cache.cpp
index cbce38da1b..57d7500b4a 100644
--- a/be/src/olap/lru_cache.cpp
+++ b/be/src/olap/lru_cache.cpp
@@ -522,7 +522,8 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name,
size_t total_capacity,
_num_shard_bits(Bits::FindLSBSetNonZero(num_shards)),
_num_shards(num_shards),
_shards(nullptr),
- _last_id(1) {
+ _last_id(1),
+ _total_capacity(total_capacity) {
_mem_tracker =
std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::GLOBAL, name);
CHECK(num_shards > 0) << "num_shards cannot be 0";
CHECK_EQ((num_shards & (num_shards - 1)), 0)
@@ -629,6 +630,14 @@ int64_t ShardedLRUCache::mem_consumption() {
return _mem_tracker->consumption();
}
+int64_t ShardedLRUCache::get_usage() {
+ size_t total_usage = 0;
+ for (int i = 0; i < _num_shards; i++) {
+ total_usage += _shards[i]->get_usage();
+ }
+ return total_usage;
+}
+
void ShardedLRUCache::update_cache_metrics() const {
size_t total_capacity = 0;
size_t total_usage = 0;
diff --git a/be/src/olap/lru_cache.h b/be/src/olap/lru_cache.h
index 1913321df3..8226ecc858 100644
--- a/be/src/olap/lru_cache.h
+++ b/be/src/olap/lru_cache.h
@@ -225,6 +225,10 @@ public:
virtual int64_t mem_consumption() = 0;
+ virtual int64_t get_usage() = 0;
+
+ virtual size_t get_total_capacity() = 0;
+
private:
DISALLOW_COPY_AND_ASSIGN(Cache);
};
@@ -411,6 +415,8 @@ public:
virtual int64_t prune() override;
int64_t prune_if(CacheValuePredicate pred, bool lazy_mode = false)
override;
int64_t mem_consumption() override;
+ int64_t get_usage() override;
+ size_t get_total_capacity() override { return _total_capacity; };
private:
void update_cache_metrics() const;
@@ -426,6 +432,7 @@ private:
const uint32_t _num_shards;
LRUCache** _shards;
std::atomic<uint64_t> _last_id;
+ size_t _total_capacity;
std::unique_ptr<MemTrackerLimiter> _mem_tracker;
std::shared_ptr<MetricEntity> _entity = nullptr;
diff --git a/be/src/olap/segment_loader.h b/be/src/olap/segment_loader.h
index 8535e69281..03a54acf55 100644
--- a/be/src/olap/segment_loader.h
+++ b/be/src/olap/segment_loader.h
@@ -89,6 +89,12 @@ public:
Status prune();
int64_t prune_all() { return _cache->prune(); };
int64_t segment_cache_mem_consumption() { return
_cache->mem_consumption(); }
+ int64_t segment_cache_get_usage() { return _cache->get_usage(); }
+ double segment_cache_get_usage_ratio() {
+ return _cache->get_total_capacity() == 0
+ ? 0
+ : ((double)_cache->get_usage() /
_cache->get_total_capacity());
+ }
private:
SegmentLoader();
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 02644f186d..0b15d0a518 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -90,6 +90,8 @@ public:
Status execute();
Status cancel(const PPlanFragmentCancelReason& reason, const std::string&
msg = "");
+ bool is_canceled() { return _cancelled; }
+
TUniqueId fragment_instance_id() const { return _fragment_instance_id; }
TUniqueId query_id() const { return _query_id; }
@@ -1074,6 +1076,25 @@ void FragmentMgr::cancel_query(const TUniqueId&
query_id, const PPlanFragmentCan
}
}
+bool FragmentMgr::query_is_canceled(const TUniqueId& query_id) {
+ std::lock_guard<std::mutex> lock(_lock);
+ auto ctx = _fragments_ctx_map.find(query_id);
+ if (ctx != _fragments_ctx_map.end()) {
+ for (auto it : ctx->second->fragment_ids) {
+ auto exec_state_iter = _fragment_map.find(it);
+ if (exec_state_iter != _fragment_map.end() &&
exec_state_iter->second) {
+ return exec_state_iter->second->is_canceled();
+ }
+
+ auto pipeline_ctx_iter = _pipeline_map.find(it);
+ if (pipeline_ctx_iter != _pipeline_map.end() &&
pipeline_ctx_iter->second) {
+ return pipeline_ctx_iter->second->is_canceled();
+ }
+ }
+ }
+ return true;
+}
+
void FragmentMgr::cancel_worker() {
LOG(INFO) << "FragmentMgr cancel worker start working.";
do {
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 6508ea0659..455e81c8d5 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -108,6 +108,8 @@ public:
void cancel_query(const TUniqueId& query_id, const
PPlanFragmentCancelReason& reason,
const std::string& msg = "");
+ bool query_is_canceled(const TUniqueId& query_id);
+
void cancel_worker();
void debug(std::stringstream& ss) override;
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 9ce82ca585..1a56458586 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -296,6 +296,13 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t
min_free_mem,
std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock);
for (auto tracker : mem_tracker_limiter_pool[i].trackers) {
if (tracker->type() == type) {
+ if (tracker->consumption() <= 104857600) { // 100M small query
does not cancel
+ continue;
+ }
+ if (ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled(
+ label_to_queryid(tracker->label()))) {
+ continue;
+ }
if (tracker->consumption() > min_free_mem) {
std::priority_queue<std::pair<int64_t, std::string>,
std::vector<std::pair<int64_t,
std::string>>,
@@ -335,11 +342,15 @@ int64_t
MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem,
std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock);
for (auto tracker : mem_tracker_limiter_pool[i].trackers) {
if (tracker->type() == type) {
- int64_t overcommit_ratio =
- (static_cast<double>(tracker->consumption()) /
tracker->limit()) * 10000;
- if (overcommit_ratio == 0) { // Small query does not cancel
+ if (tracker->consumption() <= 104857600) { // 100M small query
does not cancel
+ continue;
+ }
+ if (ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled(
+ label_to_queryid(tracker->label()))) {
continue;
}
+ int64_t overcommit_ratio =
+ (static_cast<double>(tracker->consumption()) /
tracker->limit()) * 10000;
min_pq.push(std::pair<int64_t, std::string>(overcommit_ratio,
tracker->label()));
query_consumption[tracker->label()] = tracker->consumption();
}
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index 4f53abe919..0bd547ea0e 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -93,31 +93,43 @@ void MemInfo::refresh_allocator_mem() {
void MemInfo::process_cache_gc(int64_t& freed_mem) {
// TODO, free more cache, and should free a certain percentage of
capacity, not all.
- freed_mem += ChunkAllocator::instance()->mem_consumption();
- ChunkAllocator::instance()->clear();
- freed_mem +=
-
StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE);
- StoragePageCache::instance()->prune(segment_v2::DATA_PAGE);
- // TODO add freed_mem
- SegmentLoader::instance()->prune();
+ int32_t min_free_size = 33554432; // 32M
+ if (ChunkAllocator::instance()->mem_consumption() > min_free_size) {
+ freed_mem += ChunkAllocator::instance()->mem_consumption();
+ ChunkAllocator::instance()->clear();
+ }
+
+ if
(StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE)
>
+ min_free_size) {
+ freed_mem +=
+
StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE);
+ StoragePageCache::instance()->prune(segment_v2::DATA_PAGE);
+ }
}
// step1: free all cache
// step2: free top overcommit query, if enable query memroy overcommit
// TODO Now, the meaning is different from java minor gc + full gc, more like
small gc + large gc.
bool MemInfo::process_minor_gc() {
+ MonotonicStopWatch watch;
+ watch.start();
int64_t freed_mem = 0;
std::string vm_rss_str = PerfCounters::get_vm_rss_str();
std::string mem_available_str = MemInfo::sys_mem_available_str();
Defer defer {[&]() {
- LOG(INFO) << fmt::format("Process Minor GC Free Memory {} Bytes",
freed_mem);
+ LOG(INFO) << fmt::format("Process Minor GC Free Memory {} Bytes.
cost(us): {}", freed_mem,
+ watch.elapsed_time() / 1000);
}};
MemInfo::process_cache_gc(freed_mem);
if (freed_mem > _s_process_minor_gc_size) {
return true;
}
+
+ // TODO add freed_mem
+ SegmentLoader::instance()->prune();
+
if (config::enable_query_memroy_overcommit) {
freed_mem += MemTrackerLimiter::free_top_overcommit_query(
_s_process_minor_gc_size - freed_mem, vm_rss_str,
mem_available_str);
@@ -133,22 +145,32 @@ bool MemInfo::process_minor_gc() {
// step3: free top overcommit load, load retries are more expensive, So cancel
at the end.
// step4: free top memory load
bool MemInfo::process_full_gc() {
+ MonotonicStopWatch watch;
+ watch.start();
int64_t freed_mem = 0;
std::string vm_rss_str = PerfCounters::get_vm_rss_str();
std::string mem_available_str = MemInfo::sys_mem_available_str();
- Defer defer {
- [&]() { LOG(INFO) << fmt::format("Process Full GC Free Memory {}
Bytes", freed_mem); }};
+ Defer defer {[&]() {
+ LOG(INFO) << fmt::format("Process Full GC Free Memory {} Bytes.
cost(us): {}", freed_mem,
+ watch.elapsed_time() / 1000);
+ }};
MemInfo::process_cache_gc(freed_mem);
if (freed_mem > _s_process_full_gc_size) {
return true;
}
- freed_mem += SegmentLoader::instance()->segment_cache_mem_consumption();
- SegmentLoader::instance()->prune_all();
- if (freed_mem > _s_process_full_gc_size) {
- return true;
+
+ if (SegmentLoader::instance()->segment_cache_get_usage_ratio() > 0.1) {
+ freed_mem +=
SegmentLoader::instance()->segment_cache_mem_consumption();
+ LOG(INFO) << "prune all " <<
SegmentLoader::instance()->segment_cache_get_usage()
+ << " entries in segment cache.";
+ SegmentLoader::instance()->prune_all();
+ if (freed_mem > _s_process_full_gc_size) {
+ return true;
+ }
}
+
freed_mem +=
MemTrackerLimiter::free_top_memory_query(_s_process_full_gc_size - freed_mem,
vm_rss_str,
mem_available_str);
if (freed_mem > _s_process_full_gc_size) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]