This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3871e989ac [fix](memory) Avoid repeating meaningless memory gc #17258
3871e989ac is described below

commit 3871e989acc3d6c28f9621169e4be8828060c428
Author: Xinyi Zou <[email protected]>
AuthorDate: Wed Mar 1 19:23:33 2023 +0800

    [fix](memory) Avoid repeating meaningless memory gc #17258
---
 be/src/olap/lru_cache.cpp                     | 11 +++++-
 be/src/olap/lru_cache.h                       |  7 ++++
 be/src/olap/segment_loader.h                  |  6 ++++
 be/src/runtime/fragment_mgr.cpp               | 21 +++++++++++
 be/src/runtime/fragment_mgr.h                 |  2 ++
 be/src/runtime/memory/mem_tracker_limiter.cpp | 17 +++++++--
 be/src/util/mem_info.cpp                      | 50 +++++++++++++++++++--------
 7 files changed, 96 insertions(+), 18 deletions(-)

diff --git a/be/src/olap/lru_cache.cpp b/be/src/olap/lru_cache.cpp
index cbce38da1b..57d7500b4a 100644
--- a/be/src/olap/lru_cache.cpp
+++ b/be/src/olap/lru_cache.cpp
@@ -522,7 +522,8 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name, 
size_t total_capacity,
           _num_shard_bits(Bits::FindLSBSetNonZero(num_shards)),
           _num_shards(num_shards),
           _shards(nullptr),
-          _last_id(1) {
+          _last_id(1),
+          _total_capacity(total_capacity) {
     _mem_tracker = 
std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::GLOBAL, name);
     CHECK(num_shards > 0) << "num_shards cannot be 0";
     CHECK_EQ((num_shards & (num_shards - 1)), 0)
@@ -629,6 +630,14 @@ int64_t ShardedLRUCache::mem_consumption() {
     return _mem_tracker->consumption();
 }
 
+int64_t ShardedLRUCache::get_usage() {
+    size_t total_usage = 0;
+    for (int i = 0; i < _num_shards; i++) {
+        total_usage += _shards[i]->get_usage();
+    }
+    return total_usage;
+}
+
 void ShardedLRUCache::update_cache_metrics() const {
     size_t total_capacity = 0;
     size_t total_usage = 0;
diff --git a/be/src/olap/lru_cache.h b/be/src/olap/lru_cache.h
index 1913321df3..8226ecc858 100644
--- a/be/src/olap/lru_cache.h
+++ b/be/src/olap/lru_cache.h
@@ -225,6 +225,10 @@ public:
 
     virtual int64_t mem_consumption() = 0;
 
+    virtual int64_t get_usage() = 0;
+
+    virtual size_t get_total_capacity() = 0;
+
 private:
     DISALLOW_COPY_AND_ASSIGN(Cache);
 };
@@ -411,6 +415,8 @@ public:
     virtual int64_t prune() override;
     int64_t prune_if(CacheValuePredicate pred, bool lazy_mode = false) 
override;
     int64_t mem_consumption() override;
+    int64_t get_usage() override;
+    size_t get_total_capacity() override { return _total_capacity; };
 
 private:
     void update_cache_metrics() const;
@@ -426,6 +432,7 @@ private:
     const uint32_t _num_shards;
     LRUCache** _shards;
     std::atomic<uint64_t> _last_id;
+    size_t _total_capacity;
 
     std::unique_ptr<MemTrackerLimiter> _mem_tracker;
     std::shared_ptr<MetricEntity> _entity = nullptr;
diff --git a/be/src/olap/segment_loader.h b/be/src/olap/segment_loader.h
index 8535e69281..03a54acf55 100644
--- a/be/src/olap/segment_loader.h
+++ b/be/src/olap/segment_loader.h
@@ -89,6 +89,12 @@ public:
     Status prune();
     int64_t prune_all() { return _cache->prune(); };
     int64_t segment_cache_mem_consumption() { return 
_cache->mem_consumption(); }
+    int64_t segment_cache_get_usage() { return _cache->get_usage(); }
+    double segment_cache_get_usage_ratio() {
+        return _cache->get_total_capacity() == 0
+                       ? 0
+                       : ((double)_cache->get_usage() / 
_cache->get_total_capacity());
+    }
 
 private:
     SegmentLoader();
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 02644f186d..0b15d0a518 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -90,6 +90,8 @@ public:
     Status execute();
 
     Status cancel(const PPlanFragmentCancelReason& reason, const std::string& 
msg = "");
+    bool is_canceled() { return _cancelled; }
+
     TUniqueId fragment_instance_id() const { return _fragment_instance_id; }
 
     TUniqueId query_id() const { return _query_id; }
@@ -1074,6 +1076,25 @@ void FragmentMgr::cancel_query(const TUniqueId& 
query_id, const PPlanFragmentCan
     }
 }
 
+bool FragmentMgr::query_is_canceled(const TUniqueId& query_id) {
+    std::lock_guard<std::mutex> lock(_lock);
+    auto ctx = _fragments_ctx_map.find(query_id);
+    if (ctx != _fragments_ctx_map.end()) {
+        for (auto it : ctx->second->fragment_ids) {
+            auto exec_state_iter = _fragment_map.find(it);
+            if (exec_state_iter != _fragment_map.end() && 
exec_state_iter->second) {
+                return exec_state_iter->second->is_canceled();
+            }
+
+            auto pipeline_ctx_iter = _pipeline_map.find(it);
+            if (pipeline_ctx_iter != _pipeline_map.end() && 
pipeline_ctx_iter->second) {
+                return pipeline_ctx_iter->second->is_canceled();
+            }
+        }
+    }
+    return true;
+}
+
 void FragmentMgr::cancel_worker() {
     LOG(INFO) << "FragmentMgr cancel worker start working.";
     do {
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 6508ea0659..455e81c8d5 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -108,6 +108,8 @@ public:
     void cancel_query(const TUniqueId& query_id, const 
PPlanFragmentCancelReason& reason,
                       const std::string& msg = "");
 
+    bool query_is_canceled(const TUniqueId& query_id);
+
     void cancel_worker();
 
     void debug(std::stringstream& ss) override;
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 9ce82ca585..1a56458586 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -296,6 +296,13 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t 
min_free_mem,
         std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock);
         for (auto tracker : mem_tracker_limiter_pool[i].trackers) {
             if (tracker->type() == type) {
+                if (tracker->consumption() <= 104857600) { // 100M small query 
does not cancel
+                    continue;
+                }
+                if (ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled(
+                            label_to_queryid(tracker->label()))) {
+                    continue;
+                }
                 if (tracker->consumption() > min_free_mem) {
                     std::priority_queue<std::pair<int64_t, std::string>,
                                         std::vector<std::pair<int64_t, 
std::string>>,
@@ -335,11 +342,15 @@ int64_t 
MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem,
         std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock);
         for (auto tracker : mem_tracker_limiter_pool[i].trackers) {
             if (tracker->type() == type) {
-                int64_t overcommit_ratio =
-                        (static_cast<double>(tracker->consumption()) / 
tracker->limit()) * 10000;
-                if (overcommit_ratio == 0) { // Small query does not cancel
+                if (tracker->consumption() <= 104857600) { // 100M small query 
does not cancel
+                    continue;
+                }
+                if (ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled(
+                            label_to_queryid(tracker->label()))) {
                     continue;
                 }
+                int64_t overcommit_ratio =
+                        (static_cast<double>(tracker->consumption()) / 
tracker->limit()) * 10000;
                 min_pq.push(std::pair<int64_t, std::string>(overcommit_ratio, 
tracker->label()));
                 query_consumption[tracker->label()] = tracker->consumption();
             }
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index 4f53abe919..0bd547ea0e 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -93,31 +93,43 @@ void MemInfo::refresh_allocator_mem() {
 
 void MemInfo::process_cache_gc(int64_t& freed_mem) {
     // TODO, free more cache, and should free a certain percentage of 
capacity, not all.
-    freed_mem += ChunkAllocator::instance()->mem_consumption();
-    ChunkAllocator::instance()->clear();
-    freed_mem +=
-            
StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE);
-    StoragePageCache::instance()->prune(segment_v2::DATA_PAGE);
-    // TODO add freed_mem
-    SegmentLoader::instance()->prune();
+    int32_t min_free_size = 33554432; // 32M
+    if (ChunkAllocator::instance()->mem_consumption() > min_free_size) {
+        freed_mem += ChunkAllocator::instance()->mem_consumption();
+        ChunkAllocator::instance()->clear();
+    }
+
+    if 
(StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE)
 >
+        min_free_size) {
+        freed_mem +=
+                
StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE);
+        StoragePageCache::instance()->prune(segment_v2::DATA_PAGE);
+    }
 }
 
 // step1: free all cache
 // step2: free top overcommit query, if enable query memroy overcommit
 // TODO Now, the meaning is different from java minor gc + full gc, more like 
small gc + large gc.
 bool MemInfo::process_minor_gc() {
+    MonotonicStopWatch watch;
+    watch.start();
     int64_t freed_mem = 0;
     std::string vm_rss_str = PerfCounters::get_vm_rss_str();
     std::string mem_available_str = MemInfo::sys_mem_available_str();
 
     Defer defer {[&]() {
-        LOG(INFO) << fmt::format("Process Minor GC Free Memory {} Bytes", 
freed_mem);
+        LOG(INFO) << fmt::format("Process Minor GC Free Memory {} Bytes. 
cost(us): {}", freed_mem,
+                                 watch.elapsed_time() / 1000);
     }};
 
     MemInfo::process_cache_gc(freed_mem);
     if (freed_mem > _s_process_minor_gc_size) {
         return true;
     }
+
+    // TODO add freed_mem
+    SegmentLoader::instance()->prune();
+
     if (config::enable_query_memroy_overcommit) {
         freed_mem += MemTrackerLimiter::free_top_overcommit_query(
                 _s_process_minor_gc_size - freed_mem, vm_rss_str, 
mem_available_str);
@@ -133,22 +145,32 @@ bool MemInfo::process_minor_gc() {
 // step3: free top overcommit load, load retries are more expensive, So cancel 
at the end.
 // step4: free top memory load
 bool MemInfo::process_full_gc() {
+    MonotonicStopWatch watch;
+    watch.start();
     int64_t freed_mem = 0;
     std::string vm_rss_str = PerfCounters::get_vm_rss_str();
     std::string mem_available_str = MemInfo::sys_mem_available_str();
 
-    Defer defer {
-            [&]() { LOG(INFO) << fmt::format("Process Full GC Free Memory {} 
Bytes", freed_mem); }};
+    Defer defer {[&]() {
+        LOG(INFO) << fmt::format("Process Full GC Free Memory {} Bytes. 
cost(us): {}", freed_mem,
+                                 watch.elapsed_time() / 1000);
+    }};
 
     MemInfo::process_cache_gc(freed_mem);
     if (freed_mem > _s_process_full_gc_size) {
         return true;
     }
-    freed_mem += SegmentLoader::instance()->segment_cache_mem_consumption();
-    SegmentLoader::instance()->prune_all();
-    if (freed_mem > _s_process_full_gc_size) {
-        return true;
+
+    if (SegmentLoader::instance()->segment_cache_get_usage_ratio() > 0.1) {
+        freed_mem += 
SegmentLoader::instance()->segment_cache_mem_consumption();
+        LOG(INFO) << "prune all " << 
SegmentLoader::instance()->segment_cache_get_usage()
+                  << " entries in segment cache.";
+        SegmentLoader::instance()->prune_all();
+        if (freed_mem > _s_process_full_gc_size) {
+            return true;
+        }
     }
+
     freed_mem += 
MemTrackerLimiter::free_top_memory_query(_s_process_full_gc_size - freed_mem,
                                                           vm_rss_str, 
mem_available_str);
     if (freed_mem > _s_process_full_gc_size) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to