This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new 754d2d9f23 [cherrypick](memory) Pick memory GC refactor to avoid 
frequent invalid GC (#17526)
754d2d9f23 is described below

commit 754d2d9f23df9d56d4eeb1939febcfab553a4b1d
Author: Xinyi Zou <[email protected]>
AuthorDate: Wed Mar 8 07:27:16 2023 +0800

    [cherrypick](memory) Pick memory GC refactor to avoid frequent invalid GC 
(#17526)
    
    * [fix](memory) split mem usage thread and gc thread to different threads 
(#17213)
    
    Ensure that the memory status is refreshed in time
    Avoid frequent GC
    
    * [fix](memory) Avoid repeating meaningless memory gc #17258
---
 be/src/common/config.h                        |  6 +-
 be/src/common/daemon.cpp                      | 93 ++++++++++++++++++---------
 be/src/common/daemon.h                        |  2 +
 be/src/olap/lru_cache.cpp                     | 11 +++-
 be/src/olap/lru_cache.h                       |  7 ++
 be/src/olap/segment_loader.h                  |  6 ++
 be/src/runtime/fragment_mgr.cpp               | 16 +++++
 be/src/runtime/fragment_mgr.h                 |  2 +
 be/src/runtime/memory/mem_tracker_limiter.cpp | 17 ++++-
 be/src/util/mem_info.cpp                      | 51 +++++++++++----
 10 files changed, 161 insertions(+), 50 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 5f64f97f9b..63b9bdd779 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -511,7 +511,11 @@ CONF_String(buffer_pool_limit, "20%");
 CONF_String(buffer_pool_clean_pages_limit, "50%");
 
 // Sleep time in milliseconds between memory maintenance iterations
-CONF_mInt32(memory_maintenance_sleep_time_ms, "500");
+CONF_mInt32(memory_maintenance_sleep_time_ms, "100");
+
+// After full gc, no longer full gc and minor gc during sleep.
+// After minor gc, no minor gc during sleep, but full gc is possible.
+CONF_mInt32(memory_gc_sleep_time_s, "1");
 
 // Sleep time in milliseconds between load channel memory refresh iterations
 CONF_mInt64(load_channel_memory_refresh_sleep_time_ms, "100");
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 2bdcbd414b..01dbe847eb 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -204,8 +204,7 @@ void Daemon::buffer_pool_gc_thread() {
 
 void Daemon::memory_maintenance_thread() {
     int32_t interval_milliseconds = config::memory_maintenance_sleep_time_ms;
-    int32_t cache_gc_interval_ms = config::cache_gc_interval_s * 1000;
-    int64_t cache_gc_freed_mem = 0;
+    int64_t last_print_proc_mem = PerfCounters::get_vm_rss();
     while (!_stop_background_threads_latch.wait_for(
             std::chrono::milliseconds(interval_milliseconds))) {
         if (!MemInfo::initialized() || !ExecEnv::GetInstance()->initialized()) 
{
@@ -214,56 +213,81 @@ void Daemon::memory_maintenance_thread() {
         // Refresh process memory metrics.
         doris::PerfCounters::refresh_proc_status();
         doris::MemInfo::refresh_proc_meminfo();
+        doris::MemInfo::refresh_proc_mem_no_allocator_cache();
+
+        // Update and print memory stat when the memory changes by 100M.
+        if (abs(last_print_proc_mem - PerfCounters::get_vm_rss()) > 104857600) 
{
+            last_print_proc_mem = PerfCounters::get_vm_rss();
+            doris::MemTrackerLimiter::enable_print_log_process_usage();
 
-        // Refresh allocator memory metrics.
+            // Refresh mem tracker each type counter.
+            doris::MemTrackerLimiter::refresh_global_counter();
+
+            // Refresh allocator memory metrics.
 #if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && 
!defined(THREAD_SANITIZER)
-        doris::MemInfo::refresh_allocator_mem();
-        if (config::enable_system_metrics) {
-            
DorisMetrics::instance()->system_metrics()->update_allocator_metrics();
-        }
+            doris::MemInfo::refresh_allocator_mem();
+            if (config::enable_system_metrics) {
+                
DorisMetrics::instance()->system_metrics()->update_allocator_metrics();
+            }
 #endif
-        doris::MemInfo::refresh_proc_mem_no_allocator_cache();
-
-        // Refresh mem tracker each type metrics.
-        doris::MemTrackerLimiter::refresh_global_counter();
+            if (doris::config::memory_debug) {
+                LOG(INFO) << MemTrackerLimiter::process_mem_log_str();
+                LOG_EVERY_N(INFO, 10)
+                        << 
doris::MemTrackerLimiter::log_process_usage_str("memory debug", false);
+            }
+        }
+    }
+}
 
-        // If system available memory is not enough, or the process memory 
exceeds the limit, reduce refresh interval.
-        if (doris::MemInfo::sys_mem_available() <
-                    doris::MemInfo::sys_mem_available_low_water_mark() ||
-            doris::MemInfo::proc_mem_no_allocator_cache() >= 
doris::MemInfo::mem_limit()) {
+void Daemon::memory_gc_thread() {
+    int32_t interval_milliseconds = config::memory_maintenance_sleep_time_ms;
+    int32_t cache_gc_interval_ms = config::cache_gc_interval_s * 1000;
+    int32_t memory_minor_gc_sleep_time_ms = 0;
+    int32_t memory_full_gc_sleep_time_ms = 0;
+    int64_t cache_gc_freed_mem = 0;
+    while (!_stop_background_threads_latch.wait_for(
+            std::chrono::milliseconds(interval_milliseconds))) {
+        if (!MemInfo::initialized()) {
+            continue;
+        }
+        if (memory_full_gc_sleep_time_ms <= 0 &&
+            (doris::MemInfo::sys_mem_available() <
+                     doris::MemInfo::sys_mem_available_low_water_mark() ||
+             doris::MemInfo::proc_mem_no_allocator_cache() >= 
doris::MemInfo::mem_limit())) {
+            // No longer full gc and minor gc during sleep.
+            memory_full_gc_sleep_time_ms = config::memory_gc_sleep_time_s * 
1000;
+            memory_minor_gc_sleep_time_ms = config::memory_gc_sleep_time_s * 
1000;
+            cache_gc_interval_ms = config::cache_gc_interval_s * 1000;
             doris::MemTrackerLimiter::print_log_process_usage("process full 
gc", false);
-            interval_milliseconds = std::min(100, 
config::memory_maintenance_sleep_time_ms);
             if (doris::MemInfo::process_full_gc()) {
                 // If there is not enough memory to be gc, the process memory 
usage will not be printed in the next continuous gc.
                 doris::MemTrackerLimiter::enable_print_log_process_usage();
             }
+        } else if (memory_minor_gc_sleep_time_ms <= 0 &&
+                   (doris::MemInfo::sys_mem_available() <
+                            
doris::MemInfo::sys_mem_available_warning_water_mark() ||
+                    doris::MemInfo::proc_mem_no_allocator_cache() >=
+                            doris::MemInfo::soft_mem_limit())) {
+            // No minor gc during sleep, but full gc is possible.
+            memory_minor_gc_sleep_time_ms = config::memory_gc_sleep_time_s * 
1000;
             cache_gc_interval_ms = config::cache_gc_interval_s * 1000;
-        } else if (doris::MemInfo::sys_mem_available() <
-                           
doris::MemInfo::sys_mem_available_warning_water_mark() ||
-                   doris::MemInfo::proc_mem_no_allocator_cache() >=
-                           doris::MemInfo::soft_mem_limit()) {
             doris::MemTrackerLimiter::print_log_process_usage("process minor 
gc", false);
-            interval_milliseconds = std::min(200, 
config::memory_maintenance_sleep_time_ms);
             if (doris::MemInfo::process_minor_gc()) {
                 doris::MemTrackerLimiter::enable_print_log_process_usage();
             }
-            cache_gc_interval_ms = config::cache_gc_interval_s * 1000;
         } else {
-            doris::MemTrackerLimiter::enable_print_log_process_usage();
-            interval_milliseconds = config::memory_maintenance_sleep_time_ms;
-            if (doris::config::memory_debug) {
-                LOG_EVERY_N(WARNING, 20) << 
doris::MemTrackerLimiter::log_process_usage_str(
-                        "memory debug", false); // default 10s print once
-            } else {
-                LOG_EVERY_N(INFO, 10)
-                        << MemTrackerLimiter::process_mem_log_str(); // 
default 5s print once
+            if (memory_full_gc_sleep_time_ms > 0) {
+                memory_full_gc_sleep_time_ms -= interval_milliseconds;
+            }
+            if (memory_minor_gc_sleep_time_ms > 0) {
+                memory_minor_gc_sleep_time_ms -= interval_milliseconds;
             }
             cache_gc_interval_ms -= interval_milliseconds;
             if (cache_gc_interval_ms < 0) {
                 cache_gc_freed_mem = 0;
                 doris::MemInfo::process_cache_gc(cache_gc_freed_mem);
                 LOG(INFO) << fmt::format("Process regular GC Cache, Free 
Memory {} Bytes",
-                                         cache_gc_freed_mem); // default 6s 
print once
+                                         cache_gc_freed_mem);
                 cache_gc_interval_ms = config::cache_gc_interval_s * 1000;
             }
         }
@@ -464,6 +488,10 @@ void Daemon::start() {
             "Daemon", "memory_maintenance_thread", [this]() { 
this->memory_maintenance_thread(); },
             &_memory_maintenance_thread);
     CHECK(st.ok()) << st;
+    st = Thread::create(
+            "Daemon", "memory_gc_thread", [this]() { this->memory_gc_thread(); 
},
+            &_memory_gc_thread);
+    CHECK(st.ok()) << st;
     st = Thread::create(
             "Daemon", "load_channel_tracker_refresh_thread",
             [this]() { this->load_channel_tracker_refresh_thread(); },
@@ -490,6 +518,9 @@ void Daemon::stop() {
     if (_memory_maintenance_thread) {
         _memory_maintenance_thread->join();
     }
+    if (_memory_gc_thread) {
+        _memory_gc_thread->join();
+    }
     if (_load_channel_tracker_refresh_thread) {
         _load_channel_tracker_refresh_thread->join();
     }
diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h
index 53cd925ef5..2dc4fd9bc7 100644
--- a/be/src/common/daemon.h
+++ b/be/src/common/daemon.h
@@ -48,6 +48,7 @@ private:
     void tcmalloc_gc_thread();
     void buffer_pool_gc_thread();
     void memory_maintenance_thread();
+    void memory_gc_thread();
     void load_channel_tracker_refresh_thread();
     void calculate_metrics_thread();
 
@@ -56,6 +57,7 @@ private:
     // only buffer pool gc, will be removed after.
     scoped_refptr<Thread> _buffer_pool_gc_thread;
     scoped_refptr<Thread> _memory_maintenance_thread;
+    scoped_refptr<Thread> _memory_gc_thread;
     scoped_refptr<Thread> _load_channel_tracker_refresh_thread;
     scoped_refptr<Thread> _calculate_metrics_thread;
 };
diff --git a/be/src/olap/lru_cache.cpp b/be/src/olap/lru_cache.cpp
index 0eb6fe53bb..81d9803ada 100644
--- a/be/src/olap/lru_cache.cpp
+++ b/be/src/olap/lru_cache.cpp
@@ -442,7 +442,8 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name, 
size_t total_capacity,
           _num_shard_bits(Bits::FindLSBSetNonZero(num_shards)),
           _num_shards(num_shards),
           _shards(nullptr),
-          _last_id(1) {
+          _last_id(1),
+          _total_capacity(total_capacity) {
     _mem_tracker = 
std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::GLOBAL, name);
     CHECK(num_shards > 0) << "num_shards cannot be 0";
     CHECK_EQ((num_shards & (num_shards - 1)), 0)
@@ -534,6 +535,14 @@ int64_t ShardedLRUCache::mem_consumption() {
     return _mem_tracker->consumption();
 }
 
+int64_t ShardedLRUCache::get_usage() {
+    size_t total_usage = 0;
+    for (int i = 0; i < _num_shards; i++) {
+        total_usage += _shards[i]->get_usage();
+    }
+    return total_usage;
+}
+
 void ShardedLRUCache::update_cache_metrics() const {
     size_t total_capacity = 0;
     size_t total_usage = 0;
diff --git a/be/src/olap/lru_cache.h b/be/src/olap/lru_cache.h
index 8d86fa9b1b..7d55d859c9 100644
--- a/be/src/olap/lru_cache.h
+++ b/be/src/olap/lru_cache.h
@@ -219,6 +219,10 @@ public:
 
     virtual int64_t mem_consumption() = 0;
 
+    virtual int64_t get_usage() = 0;
+
+    virtual size_t get_total_capacity() = 0;
+
 private:
     DISALLOW_COPY_AND_ASSIGN(Cache);
 };
@@ -376,6 +380,8 @@ public:
     virtual int64_t prune() override;
     int64_t prune_if(CacheValuePredicate pred, bool lazy_mode = false) 
override;
     int64_t mem_consumption() override;
+    int64_t get_usage() override;
+    size_t get_total_capacity() override { return _total_capacity; };
 
 private:
     void update_cache_metrics() const;
@@ -391,6 +397,7 @@ private:
     const uint32_t _num_shards;
     LRUCache** _shards;
     std::atomic<uint64_t> _last_id;
+    size_t _total_capacity;
 
     std::unique_ptr<MemTrackerLimiter> _mem_tracker;
     std::shared_ptr<MetricEntity> _entity = nullptr;
diff --git a/be/src/olap/segment_loader.h b/be/src/olap/segment_loader.h
index 8535e69281..03a54acf55 100644
--- a/be/src/olap/segment_loader.h
+++ b/be/src/olap/segment_loader.h
@@ -89,6 +89,12 @@ public:
     Status prune();
     int64_t prune_all() { return _cache->prune(); };
     int64_t segment_cache_mem_consumption() { return 
_cache->mem_consumption(); }
+    int64_t segment_cache_get_usage() { return _cache->get_usage(); }
+    double segment_cache_get_usage_ratio() {
+        return _cache->get_total_capacity() == 0
+                       ? 0
+                       : ((double)_cache->get_usage() / 
_cache->get_total_capacity());
+    }
 
 private:
     SegmentLoader();
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 7d810555ae..a332583739 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -95,6 +95,8 @@ public:
     Status execute();
 
     Status cancel(const PPlanFragmentCancelReason& reason, const std::string& 
msg = "");
+    bool is_canceled() { return _cancelled; }
+
     TUniqueId fragment_instance_id() const { return _fragment_instance_id; }
 
     TUniqueId query_id() const { return _query_id; }
@@ -822,6 +824,20 @@ void FragmentMgr::cancel_query(const TUniqueId& query_id, 
const PPlanFragmentCan
     }
 }
 
+bool FragmentMgr::query_is_canceled(const TUniqueId& query_id) {
+    std::lock_guard<std::mutex> lock(_lock);
+    auto ctx = _fragments_ctx_map.find(query_id);
+    if (ctx != _fragments_ctx_map.end()) {
+        for (auto it : ctx->second->fragment_ids) {
+            auto exec_state_iter = _fragment_map.find(it);
+            if (exec_state_iter != _fragment_map.end() && 
exec_state_iter->second) {
+                return exec_state_iter->second->is_canceled();
+            }
+        }
+    }
+    return true;
+}
+
 void FragmentMgr::cancel_worker() {
     LOG(INFO) << "FragmentMgr cancel worker start working.";
     do {
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index c5411eeed9..dc1567bdc2 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -81,6 +81,8 @@ public:
     void cancel_query(const TUniqueId& query_id, const 
PPlanFragmentCancelReason& reason,
                       const std::string& msg = "");
 
+    bool query_is_canceled(const TUniqueId& query_id);
+
     void cancel_worker();
 
     virtual void debug(std::stringstream& ss);
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 795aeccc94..3293c00914 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -295,6 +295,13 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t 
min_free_mem,
         std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock);
         for (auto tracker : mem_tracker_limiter_pool[i].trackers) {
             if (tracker->type() == type) {
+                if (tracker->consumption() <= 104857600) { // 100M small query 
does not cancel
+                    continue;
+                }
+                if (ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled(
+                            label_to_queryid(tracker->label()))) {
+                    continue;
+                }
                 if (tracker->consumption() > min_free_mem) {
                     std::priority_queue<std::pair<int64_t, std::string>,
                                         std::vector<std::pair<int64_t, 
std::string>>,
@@ -334,11 +341,15 @@ int64_t 
MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem,
         std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock);
         for (auto tracker : mem_tracker_limiter_pool[i].trackers) {
             if (tracker->type() == type) {
-                int64_t overcommit_ratio =
-                        (static_cast<double>(tracker->consumption()) / 
tracker->limit()) * 10000;
-                if (overcommit_ratio == 0) { // Small query does not cancel
+                if (tracker->consumption() <= 104857600) { // 100M small query 
does not cancel
+                    continue;
+                }
+                if (ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled(
+                            label_to_queryid(tracker->label()))) {
                     continue;
                 }
+                int64_t overcommit_ratio =
+                        (static_cast<double>(tracker->consumption()) / 
tracker->limit()) * 10000;
                 min_pq.push(std::pair<int64_t, std::string>(overcommit_ratio, 
tracker->label()));
                 query_consumption[tracker->label()] = tracker->consumption();
             }
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index 5fb42611ed..818223c1a0 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -93,30 +93,43 @@ void MemInfo::refresh_allocator_mem() {
 
 void MemInfo::process_cache_gc(int64_t& freed_mem) {
     // TODO, free more cache, and should free a certain percentage of 
capacity, not all.
-    freed_mem += ChunkAllocator::instance()->mem_consumption();
-    ChunkAllocator::instance()->clear();
-    freed_mem +=
-            
StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE);
-    StoragePageCache::instance()->prune(segment_v2::DATA_PAGE);
-    // TODO add freed_mem
-    SegmentLoader::instance()->prune();
+    int32_t min_free_size = 33554432; // 32M
+    if (ChunkAllocator::instance()->mem_consumption() > min_free_size) {
+        freed_mem += ChunkAllocator::instance()->mem_consumption();
+        ChunkAllocator::instance()->clear();
+    }
+
+    if 
(StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE)
 >
+        min_free_size) {
+        freed_mem +=
+                
StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE);
+        StoragePageCache::instance()->prune(segment_v2::DATA_PAGE);
+    }
 }
 
 // step1: free all cache
 // step2: free top overcommit query, if enable query memroy overcommit
+// TODO Now, the meaning is different from java minor gc + full gc, more like 
small gc + large gc.
 bool MemInfo::process_minor_gc() {
+    MonotonicStopWatch watch;
+    watch.start();
     int64_t freed_mem = 0;
     std::string vm_rss_str = PerfCounters::get_vm_rss_str();
     std::string mem_available_str = MemInfo::sys_mem_available_str();
 
     Defer defer {[&]() {
-        LOG(INFO) << fmt::format("Process Minor GC Free Memory {} Bytes", 
freed_mem);
+        LOG(INFO) << fmt::format("Process Minor GC Free Memory {} Bytes. 
cost(us): {}", freed_mem,
+                                 watch.elapsed_time() / 1000);
     }};
 
     MemInfo::process_cache_gc(freed_mem);
     if (freed_mem > _s_process_minor_gc_size) {
         return true;
     }
+
+    // TODO add freed_mem
+    SegmentLoader::instance()->prune();
+
     if (config::enable_query_memroy_overcommit) {
         freed_mem += MemTrackerLimiter::free_top_overcommit_query(
                 _s_process_minor_gc_size - freed_mem, vm_rss_str, 
mem_available_str);
@@ -132,22 +145,32 @@ bool MemInfo::process_minor_gc() {
 // step3: free top overcommit load, load retries are more expensive, So cancel 
at the end.
 // step4: free top memory load
 bool MemInfo::process_full_gc() {
+    MonotonicStopWatch watch;
+    watch.start();
     int64_t freed_mem = 0;
     std::string vm_rss_str = PerfCounters::get_vm_rss_str();
     std::string mem_available_str = MemInfo::sys_mem_available_str();
 
-    Defer defer {
-            [&]() { LOG(INFO) << fmt::format("Process Full GC Free Memory {} 
Bytes", freed_mem); }};
+    Defer defer {[&]() {
+        LOG(INFO) << fmt::format("Process Full GC Free Memory {} Bytes. 
cost(us): {}", freed_mem,
+                                 watch.elapsed_time() / 1000);
+    }};
 
     MemInfo::process_cache_gc(freed_mem);
     if (freed_mem > _s_process_full_gc_size) {
         return true;
     }
-    freed_mem += SegmentLoader::instance()->segment_cache_mem_consumption();
-    SegmentLoader::instance()->prune_all();
-    if (freed_mem > _s_process_full_gc_size) {
-        return true;
+
+    if (SegmentLoader::instance()->segment_cache_get_usage_ratio() > 0.1) {
+        freed_mem += 
SegmentLoader::instance()->segment_cache_mem_consumption();
+        LOG(INFO) << "prune all " << 
SegmentLoader::instance()->segment_cache_get_usage()
+                  << " entries in segment cache.";
+        SegmentLoader::instance()->prune_all();
+        if (freed_mem > _s_process_full_gc_size) {
+            return true;
+        }
     }
+
     freed_mem += 
MemTrackerLimiter::free_top_memory_query(_s_process_full_gc_size - freed_mem,
                                                           vm_rss_str, 
mem_available_str);
     if (freed_mem > _s_process_full_gc_size) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to