This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 2ed6a00fd1b [opt](memory) Add GlobalMemoryArbitrator and support 
ReserveMemory (#34985) (#35070)
2ed6a00fd1b is described below

commit 2ed6a00fd1b6ec64b28c86a130d730f68eab1700
Author: Xinyi Zou <[email protected]>
AuthorDate: Wed May 22 09:53:45 2024 +0800

    [opt](memory) Add GlobalMemoryArbitrator and support ReserveMemory (#34985) 
(#35070)
---
 be/src/agent/task_worker_pool.cpp                  |   3 +-
 be/src/common/daemon.cpp                           |  25 ++--
 be/src/olap/memtable_memory_limiter.cpp            |  22 ++--
 be/src/olap/olap_server.cpp                        |   5 +-
 be/src/olap/rowset/segcompaction.cpp               |   3 +-
 be/src/olap/rowset_builder.cpp                     |   3 +-
 be/src/runtime/memory/global_memory_arbitrator.cpp |  34 ++++++
 be/src/runtime/memory/global_memory_arbitrator.h   | 136 +++++++++++++++++++++
 be/src/runtime/memory/mem_tracker.h                |   8 +-
 be/src/runtime/memory/mem_tracker_limiter.cpp      |  51 +-------
 be/src/runtime/memory/mem_tracker_limiter.h        |  24 +++-
 be/src/runtime/memory/thread_mem_tracker_mgr.cpp   |   2 +
 be/src/runtime/memory/thread_mem_tracker_mgr.h     | 101 +++++++++++++--
 be/src/runtime/thread_context.h                    |  22 ++++
 be/src/runtime/workload_group/workload_group.cpp   |   3 +-
 .../workload_group/workload_group_manager.cpp      |  10 +-
 be/src/util/mem_info.cpp                           |  53 ++++----
 be/src/util/mem_info.h                             |  32 ++---
 be/src/vec/common/allocator.cpp                    |   9 +-
 19 files changed, 390 insertions(+), 156 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 30c83fdb878..37ec4bc6da7 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -73,6 +73,7 @@
 #include "olap/txn_manager.h"
 #include "olap/utils.h"
 #include "runtime/exec_env.h"
+#include "runtime/memory/global_memory_arbitrator.h"
 #include "runtime/snapshot_loader.h"
 #include "service/backend_options.h"
 #include "util/debug_points.h"
@@ -1563,7 +1564,7 @@ void 
PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest&
                 .error(status);
     } else {
         if (!config::disable_auto_compaction &&
-            !MemInfo::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
+            
!GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
             for (auto [tablet_id, _] : succ_tablets) {
                 TabletSharedPtr tablet = 
_engine.tablet_manager()->get_tablet(tablet_id);
                 if (tablet != nullptr) {
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 8d79e6f2181..4787f603650 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -47,6 +47,7 @@
 #include "runtime/client_cache.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
+#include "runtime/memory/global_memory_arbitrator.h"
 #include "runtime/memory/mem_tracker.h"
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/runtime_query_statistics_mgr.h"
@@ -191,7 +192,7 @@ void Daemon::memory_maintenance_thread() {
         // Refresh process memory metrics.
         doris::PerfCounters::refresh_proc_status();
         doris::MemInfo::refresh_proc_meminfo();
-        doris::MemInfo::refresh_proc_mem_no_allocator_cache();
+        doris::GlobalMemoryArbitrator::refresh_vm_rss_sub_allocator_cache();
 
         // Update and print memory stat when the memory changes by 256M.
         if (abs(last_print_proc_mem - PerfCounters::get_vm_rss()) > 268435456) 
{
@@ -212,7 +213,7 @@ void Daemon::memory_maintenance_thread() {
 
             
ExecEnv::GetInstance()->brpc_iobuf_block_memory_tracker()->set_consumption(
                     butil::IOBuf::block_memory());
-            LOG(INFO) << MemTrackerLimiter::
+            LOG(INFO) << doris::GlobalMemoryArbitrator::
                             process_mem_log_str(); // print mem log when 
memory state by 256M
         }
     }
@@ -229,21 +230,22 @@ void Daemon::memory_gc_thread() {
             continue;
         }
         auto sys_mem_available = doris::MemInfo::sys_mem_available();
-        auto proc_mem_no_allocator_cache = 
doris::MemInfo::proc_mem_no_allocator_cache();
+        auto process_memory_usage = 
doris::GlobalMemoryArbitrator::process_memory_usage();
 
         // GC excess memory for resource groups that not enable overcommit
-        auto tg_free_mem = doris::MemInfo::tg_not_enable_overcommit_group_gc();
+        auto tg_free_mem = doris::MemInfo::tg_disable_overcommit_group_gc();
         sys_mem_available += tg_free_mem;
-        proc_mem_no_allocator_cache -= tg_free_mem;
+        process_memory_usage -= tg_free_mem;
 
         if (memory_full_gc_sleep_time_ms <= 0 &&
             (sys_mem_available < 
doris::MemInfo::sys_mem_available_low_water_mark() ||
-             proc_mem_no_allocator_cache >= doris::MemInfo::mem_limit())) {
+             process_memory_usage >= doris::MemInfo::mem_limit())) {
             // No longer full gc and minor gc during sleep.
             memory_full_gc_sleep_time_ms = memory_gc_sleep_time_ms;
             memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms;
-            LOG(INFO) << fmt::format("[MemoryGC] start full GC, {}.",
-                                     
MemTrackerLimiter::process_limit_exceeded_errmsg_str());
+            LOG(INFO) << fmt::format(
+                    "[MemoryGC] start full GC, {}.",
+                    
doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str());
             doris::MemTrackerLimiter::print_log_process_usage();
             if (doris::MemInfo::process_full_gc()) {
                 // If there is not enough memory to be gc, the process memory 
usage will not be printed in the next continuous gc.
@@ -251,11 +253,12 @@ void Daemon::memory_gc_thread() {
             }
         } else if (memory_minor_gc_sleep_time_ms <= 0 &&
                    (sys_mem_available < 
doris::MemInfo::sys_mem_available_warning_water_mark() ||
-                    proc_mem_no_allocator_cache >= 
doris::MemInfo::soft_mem_limit())) {
+                    process_memory_usage >= doris::MemInfo::soft_mem_limit())) 
{
             // No minor gc during sleep, but full gc is possible.
             memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms;
-            LOG(INFO) << fmt::format("[MemoryGC] start minor GC, {}.",
-                                     
MemTrackerLimiter::process_soft_limit_exceeded_errmsg_str());
+            LOG(INFO) << fmt::format(
+                    "[MemoryGC] start minor GC, {}.",
+                    
doris::GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str());
             doris::MemTrackerLimiter::print_log_process_usage();
             if (doris::MemInfo::process_minor_gc()) {
                 doris::MemTrackerLimiter::enable_print_log_process_usage();
diff --git a/be/src/olap/memtable_memory_limiter.cpp 
b/be/src/olap/memtable_memory_limiter.cpp
index f0a46aaa452..dc128137ae4 100644
--- a/be/src/olap/memtable_memory_limiter.cpp
+++ b/be/src/olap/memtable_memory_limiter.cpp
@@ -88,7 +88,8 @@ int64_t MemTableMemoryLimiter::_avail_mem_lack() {
 int64_t MemTableMemoryLimiter::_proc_mem_extra() {
     // reserve a small amount of memory so we do not trigger MinorGC
     auto reserved_mem = doris::MemInfo::sys_mem_available_low_water_mark();
-    auto proc_mem_extra = MemInfo::proc_mem_no_allocator_cache() - 
MemInfo::soft_mem_limit();
+    auto proc_mem_extra =
+            GlobalMemoryArbitrator::process_memory_usage() - 
MemInfo::soft_mem_limit();
     return proc_mem_extra + reserved_mem;
 }
 
@@ -222,14 +223,17 @@ void MemTableMemoryLimiter::refresh_mem_tracker() {
 
     _last_limit = limit;
     _log_timer.reset();
-    LOG(INFO) << ss.str() << ", process mem: " << 
PerfCounters::get_vm_rss_str()
-              << " (without allocator cache: "
-              << 
PrettyPrinter::print_bytes(MemInfo::proc_mem_no_allocator_cache())
-              << "), load mem: " << 
PrettyPrinter::print_bytes(_mem_tracker->consumption())
-              << ", memtable writers num: " << _writers.size()
-              << " (active: " << PrettyPrinter::print_bytes(_active_mem_usage)
-              << ", write: " << PrettyPrinter::print_bytes(_write_mem_usage)
-              << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage) 
<< ")";
+    // if not exist load task, this log should not be printed.
+    if (_mem_usage != 0) {
+        LOG(INFO) << ss.str() << ", process mem: " << 
PerfCounters::get_vm_rss_str()
+                  << " (without allocator cache: "
+                  << 
PrettyPrinter::print_bytes(GlobalMemoryArbitrator::process_memory_usage())
+                  << "), load mem: " << 
PrettyPrinter::print_bytes(_mem_tracker->consumption())
+                  << ", memtable writers num: " << _writers.size()
+                  << " (active: " << 
PrettyPrinter::print_bytes(_active_mem_usage)
+                  << ", write: " << 
PrettyPrinter::print_bytes(_write_mem_usage)
+                  << ", flush: " << 
PrettyPrinter::print_bytes(_flush_mem_usage) << ")";
+    }
 }
 
 void MemTableMemoryLimiter::_refresh_mem_tracker() {
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 0a8bcedf14e..b667a804906 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -70,6 +70,7 @@
 #include "olap/task/index_builder.h"
 #include "runtime/client_cache.h"
 #include "runtime/memory/cache_manager.h"
+#include "runtime/memory/global_memory_arbitrator.h"
 #include "service/brpc.h"
 #include "service/point_query_executor.h"
 #include "util/brpc_client_cache.h"
@@ -621,7 +622,7 @@ void StorageEngine::_compaction_tasks_producer_callback() {
     int64_t interval = config::generate_compaction_tasks_interval_ms;
     do {
         if (!config::disable_auto_compaction &&
-            !MemInfo::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
+            
!GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
             _adjust_compaction_thread_num();
 
             bool check_score = false;
@@ -1359,7 +1360,7 @@ void 
StorageEngine::_cold_data_compaction_producer_callback() {
     while (!_stop_background_threads_latch.wait_for(
             std::chrono::seconds(config::cold_data_compaction_interval_sec))) {
         if (config::disable_auto_compaction ||
-            MemInfo::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
+            
GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
             continue;
         }
 
diff --git a/be/src/olap/rowset/segcompaction.cpp 
b/be/src/olap/rowset/segcompaction.cpp
index 2de881ee13a..8fee04ccb80 100644
--- a/be/src/olap/rowset/segcompaction.cpp
+++ b/be/src/olap/rowset/segcompaction.cpp
@@ -56,6 +56,7 @@
 #include "olap/storage_engine.h"
 #include "olap/tablet_reader.h"
 #include "olap/tablet_schema.h"
+#include "runtime/memory/global_memory_arbitrator.h"
 #include "runtime/thread_context.h"
 #include "util/debug_points.h"
 #include "util/mem_info.h"
@@ -212,7 +213,7 @@ Status 
SegcompactionWorker::_create_segment_writer_for_segcompaction(
 Status 
SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPtr 
segments) {
     
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->segcompaction_mem_tracker());
     /* throttle segcompaction task if memory depleted */
-    if (MemInfo::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
+    if (GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
         return Status::Error<FETCH_MEMORY_EXCEEDED>("skip segcompaction due to 
memory shortage");
     }
 
diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp
index 2153a9ad1a8..7ff06b39eb0 100644
--- a/be/src/olap/rowset_builder.cpp
+++ b/be/src/olap/rowset_builder.cpp
@@ -48,6 +48,7 @@
 #include "olap/tablet_meta.h"
 #include "olap/tablet_schema.h"
 #include "olap/txn_manager.h"
+#include "runtime/memory/global_memory_arbitrator.h"
 #include "util/brpc_client_cache.h"
 #include "util/mem_info.h"
 #include "util/ref_count_closure.h"
@@ -140,7 +141,7 @@ Status 
RowsetBuilder::init_mow_context(std::shared_ptr<MowContext>& mow_context)
 
 Status RowsetBuilder::check_tablet_version_count() {
     if (!_tablet->exceed_version_limit(config::max_tablet_version_num - 100) ||
-        MemInfo::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
+        GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
         return Status::OK();
     }
     //trigger compaction
diff --git a/be/src/runtime/memory/global_memory_arbitrator.cpp 
b/be/src/runtime/memory/global_memory_arbitrator.cpp
new file mode 100644
index 00000000000..dc686f7c5ab
--- /dev/null
+++ b/be/src/runtime/memory/global_memory_arbitrator.cpp
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/memory/global_memory_arbitrator.h"
+
+#include <bvar/bvar.h>
+
+namespace doris {
+
+bvar::PassiveStatus<int64_t> g_vm_rss_sub_allocator_cache(
+        "meminfo_vm_rss_sub_allocator_cache",
+        [](void*) { return 
GlobalMemoryArbitrator::vm_rss_sub_allocator_cache(); }, nullptr);
+bvar::PassiveStatus<int64_t> g_process_memory_usage(
+        "meminfo_process_memory_usage",
+        [](void*) { return GlobalMemoryArbitrator::process_memory_usage(); }, 
nullptr);
+
+std::atomic<int64_t> GlobalMemoryArbitrator::_s_vm_rss_sub_allocator_cache = 
-1;
+std::atomic<int64_t> GlobalMemoryArbitrator::_s_process_reserved_memory = 0;
+
+} // namespace doris
diff --git a/be/src/runtime/memory/global_memory_arbitrator.h 
b/be/src/runtime/memory/global_memory_arbitrator.h
new file mode 100644
index 00000000000..b1879cb1a7b
--- /dev/null
+++ b/be/src/runtime/memory/global_memory_arbitrator.h
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "util/mem_info.h"
+
+namespace doris {
+
+class GlobalMemoryArbitrator {
+public:
+    /** jemalloc pdirty is number of pages within unused extents that are 
potentially
+      * dirty, and for which madvise() or similar has not been called.
+      *
+      * So they will be subtracted from RSS to make accounting more
+      * accurate, since those pages are not really RSS but a memory
+      * that can be used at anytime via jemalloc.
+      */
+    static inline void refresh_vm_rss_sub_allocator_cache() {
+        _s_vm_rss_sub_allocator_cache.store(
+                PerfCounters::get_vm_rss() - 
static_cast<int64_t>(MemInfo::allocator_cache_mem()),
+                std::memory_order_relaxed);
+        MemInfo::refresh_interval_memory_growth = 0;
+    }
+    static inline int64_t vm_rss_sub_allocator_cache() {
+        return _s_vm_rss_sub_allocator_cache.load(std::memory_order_relaxed);
+    }
+
+    // If need to use process memory in your execution logic, pls use it.
+    // equal to real process memory(vm_rss), subtract jemalloc dirty page 
cache,
+    // add reserved memory and growth memory since the last vm_rss update.
+    static inline int64_t process_memory_usage() {
+        return vm_rss_sub_allocator_cache() +
+               
MemInfo::refresh_interval_memory_growth.load(std::memory_order_relaxed) +
+               process_reserved_memory();
+    }
+
+    static inline bool try_reserve_process_memory(int64_t bytes) {
+        if (MemInfo::sys_mem_available() - bytes < 
MemInfo::sys_mem_available_low_water_mark()) {
+            return false;
+        }
+        int64_t old_reserved_mem = 
_s_process_reserved_memory.load(std::memory_order_relaxed);
+        int64_t new_reserved_mem = 0;
+        do {
+            new_reserved_mem = old_reserved_mem + bytes;
+            if (UNLIKELY(vm_rss_sub_allocator_cache() +
+                                 MemInfo::refresh_interval_memory_growth.load(
+                                         std::memory_order_relaxed) +
+                                 new_reserved_mem >=
+                         MemInfo::mem_limit())) {
+                return false;
+            }
+        } while (!_s_process_reserved_memory.compare_exchange_weak(
+                old_reserved_mem, new_reserved_mem, 
std::memory_order_relaxed));
+        return true;
+    }
+
+    static inline void release_process_reserved_memory(int64_t bytes) {
+        _s_process_reserved_memory.fetch_sub(bytes, std::memory_order_relaxed);
+    }
+
+    static inline int64_t process_reserved_memory() {
+        return _s_process_reserved_memory.load(std::memory_order_relaxed);
+    }
+
+    static bool is_exceed_soft_mem_limit(int64_t bytes = 0) {
+        return process_memory_usage() + bytes >= MemInfo::soft_mem_limit() ||
+               MemInfo::sys_mem_available() - bytes <
+                       MemInfo::sys_mem_available_warning_water_mark();
+    }
+
+    static bool is_exceed_hard_mem_limit(int64_t bytes = 0) {
+        // Limit process memory usage using the actual physical memory of the 
process in `/proc/self/status`.
+        // This is independent of the consumption value of the mem tracker, 
which counts the virtual memory
+        // of the process malloc.
+        // for fast, expect MemInfo::initialized() to be true.
+        //
+        // tcmalloc/jemalloc allocator cache does not participate in the mem 
check as part of the process physical memory.
+        // because `new/malloc` will trigger mem hook when using 
tcmalloc/jemalloc allocator cache,
+        // but it may not actually alloc physical memory, which is not 
expected in mem hook fail.
+        return process_memory_usage() + bytes >= MemInfo::mem_limit() ||
+               MemInfo::sys_mem_available() - bytes < 
MemInfo::sys_mem_available_low_water_mark();
+    }
+
+    static std::string process_mem_log_str() {
+        return fmt::format(
+                "os physical memory {}. process memory used {}, limit {}, soft 
limit {}. sys "
+                "available memory {}, low water mark {}, warning water mark 
{}. Refresh interval "
+                "memory growth {} B",
+                PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES),
+                PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(),
+                MemInfo::soft_mem_limit_str(), 
MemInfo::sys_mem_available_str(),
+                
PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES),
+                
PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(), 
TUnit::BYTES),
+                MemInfo::refresh_interval_memory_growth);
+    }
+
+    static std::string process_limit_exceeded_errmsg_str() {
+        return fmt::format(
+                "process memory used {} exceed limit {} or sys available 
memory {} less than low "
+                "water mark {}",
+                PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(),
+                MemInfo::sys_mem_available_str(),
+                
PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), 
TUnit::BYTES));
+    }
+
+    static std::string process_soft_limit_exceeded_errmsg_str() {
+        return fmt::format(
+                "process memory used {} exceed soft limit {} or sys available 
memory {} less than "
+                "warning water mark {}.",
+                PerfCounters::get_vm_rss_str(), MemInfo::soft_mem_limit_str(),
+                MemInfo::sys_mem_available_str(),
+                
PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(),
+                                     TUnit::BYTES));
+    }
+
+private:
+    static std::atomic<int64_t> _s_vm_rss_sub_allocator_cache;
+    static std::atomic<int64_t> _s_process_reserved_memory;
+};
+
+} // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker.h 
b/be/src/runtime/memory/mem_tracker.h
index de7628c1749..d308d201901 100644
--- a/be/src/runtime/memory/mem_tracker.h
+++ b/be/src/runtime/memory/mem_tracker.h
@@ -70,7 +70,7 @@ public:
         MemCounter() : _current_value(0), _peak_value(0) {}
 
         void add(int64_t delta) {
-            auto value = _current_value.fetch_add(delta, 
std::memory_order_relaxed) + delta;
+            int64_t value = _current_value.fetch_add(delta, 
std::memory_order_relaxed) + delta;
             update_peak(value);
         }
 
@@ -79,8 +79,8 @@ public:
         }
 
         bool try_add(int64_t delta, int64_t max) {
-            auto cur_val = _current_value.load(std::memory_order_relaxed);
-            auto new_val = 0;
+            int64_t cur_val = _current_value.load(std::memory_order_relaxed);
+            int64_t new_val = 0;
             do {
                 new_val = cur_val + delta;
                 if (UNLIKELY(new_val > max)) {
@@ -100,7 +100,7 @@ public:
         }
 
         void update_peak(int64_t value) {
-            auto pre_value = _peak_value.load(std::memory_order_relaxed);
+            int64_t pre_value = _peak_value.load(std::memory_order_relaxed);
             while (value > pre_value && !_peak_value.compare_exchange_weak(
                                                 pre_value, value, 
std::memory_order_relaxed)) {
             }
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp 
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 2218bed6959..b84f7c54957 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -31,6 +31,7 @@
 #include "olap/memtable_memory_limiter.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
+#include "runtime/memory/global_memory_arbitrator.h"
 #include "runtime/thread_context.h"
 #include "runtime/workload_group/workload_group.h"
 #include "service/backend_options.h"
@@ -306,7 +307,7 @@ void MemTrackerLimiter::print_log_usage(const std::string& 
msg) {
     if (_enable_print_log_usage) {
         _enable_print_log_usage = false;
         std::string detail = msg;
-        detail += "\nProcess Memory Summary:\n    " + 
MemTrackerLimiter::process_mem_log_str();
+        detail += "\nProcess Memory Summary:\n    " + 
GlobalMemoryArbitrator::process_mem_log_str();
         detail += "\nMemory Tracker Summary:    " + log_usage();
         std::string child_trackers_usage;
         std::vector<MemTracker::Snapshot> snapshots;
@@ -324,7 +325,7 @@ void MemTrackerLimiter::print_log_usage(const std::string& 
msg) {
 
 std::string MemTrackerLimiter::log_process_usage_str() {
     std::string detail;
-    detail += "\nProcess Memory Summary:\n    " + 
MemTrackerLimiter::process_mem_log_str();
+    detail += "\nProcess Memory Summary:\n    " + 
GlobalMemoryArbitrator::process_mem_log_str();
     std::vector<MemTracker::Snapshot> snapshots;
     MemTrackerLimiter::make_process_snapshots(&snapshots);
     MemTrackerLimiter::make_type_snapshots(&snapshots, 
MemTrackerLimiter::Type::GLOBAL);
@@ -355,50 +356,6 @@ void MemTrackerLimiter::print_log_process_usage() {
     }
 }
 
-bool MemTrackerLimiter::sys_mem_exceed_limit_check(int64_t bytes) {
-    // Limit process memory usage using the actual physical memory of the 
process in `/proc/self/status`.
-    // This is independent of the consumption value of the mem tracker, which 
counts the virtual memory
-    // of the process malloc.
-    // for fast, expect MemInfo::initialized() to be true.
-    //
-    // tcmalloc/jemalloc allocator cache does not participate in the mem check 
as part of the process physical memory.
-    // because `new/malloc` will trigger mem hook when using tcmalloc/jemalloc 
allocator cache,
-    // but it may not actually alloc physical memory, which is not expected in 
mem hook fail.
-    return MemInfo::proc_mem_no_allocator_cache() + bytes >= 
MemInfo::mem_limit() ||
-           MemInfo::sys_mem_available() < 
MemInfo::sys_mem_available_low_water_mark();
-}
-
-std::string MemTrackerLimiter::process_mem_log_str() {
-    return fmt::format(
-            "os physical memory {}. process memory used {}, limit {}, soft 
limit {}. sys "
-            "available memory {}, low water mark {}, warning water mark {}. 
Refresh interval "
-            "memory growth {} B",
-            PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES),
-            PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(), 
MemInfo::soft_mem_limit_str(),
-            MemInfo::sys_mem_available_str(),
-            PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), 
TUnit::BYTES),
-            
PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(), 
TUnit::BYTES),
-            MemInfo::refresh_interval_memory_growth);
-}
-
-std::string MemTrackerLimiter::process_limit_exceeded_errmsg_str() {
-    return fmt::format(
-            "process memory used {} exceed limit {} or sys available memory {} 
less than low "
-            "water mark {}",
-            PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(),
-            MemInfo::sys_mem_available_str(),
-            PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), 
TUnit::BYTES));
-}
-
-std::string MemTrackerLimiter::process_soft_limit_exceeded_errmsg_str() {
-    return fmt::format(
-            "process memory used {} exceed soft limit {} or sys available 
memory {} less than "
-            "warning water mark {}.",
-            PerfCounters::get_vm_rss_str(), MemInfo::soft_mem_limit_str(),
-            MemInfo::sys_mem_available_str(),
-            
PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(), 
TUnit::BYTES));
-}
-
 std::string MemTrackerLimiter::tracker_limit_exceeded_str() {
     std::string err_msg = fmt::format(
             "memory tracker limit exceeded, tracker label:{}, type:{}, limit "
@@ -490,7 +447,7 @@ int64_t MemTrackerLimiter::free_top_memory_query(
                     } else if (tracker->consumption() + prepare_free_mem < 
min_free_mem) {
                         min_pq.emplace(tracker->consumption(), 
tracker->label());
                         prepare_free_mem += tracker->consumption();
-                    } else if (tracker->consumption() > min_pq.top().first) {
+                    } else if (!min_pq.empty() && tracker->consumption() > 
min_pq.top().first) {
                         min_pq.emplace(tracker->consumption(), 
tracker->label());
                         prepare_free_mem += tracker->consumption();
                         while (prepare_free_mem - min_pq.top().first > 
min_free_mem) {
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index 2e510e1f462..3a891ca3a14 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -43,7 +43,7 @@ namespace doris {
 
 class RuntimeProfile;
 
-constexpr auto MEM_TRACKER_GROUP_NUM = 1000;
+constexpr size_t MEM_TRACKER_GROUP_NUM = 1000;
 
 struct TrackerLimiterGroup {
     // Note! in order to enable ExecEnv::mem_tracker_limiter_pool support 
resize,
@@ -129,8 +129,6 @@ public:
         __builtin_unreachable();
     }
 
-    static bool sys_mem_exceed_limit_check(int64_t bytes);
-
     void set_consumption() { LOG(FATAL) << "MemTrackerLimiter set_consumption 
not supported"; }
     Type type() const { return _type; }
     int64_t group_num() const { return _group_num; }
@@ -138,6 +136,23 @@ public:
     int64_t limit() const { return _limit; }
     bool limit_exceeded() const { return _limit >= 0 && _limit < 
consumption(); }
 
+    bool try_consume(int64_t bytes) const {
+        if (UNLIKELY(bytes == 0)) {
+            return true;
+        }
+        bool st = true;
+        if (is_overcommit_tracker() && config::enable_query_memory_overcommit) 
{
+            st = _consumption->try_add(bytes, _limit);
+        } else {
+            _consumption->add(bytes);
+        }
+        if (st && _query_statistics) {
+            
_query_statistics->set_max_peak_memory_bytes(_consumption->peak_value());
+            
_query_statistics->set_current_used_memory_bytes(_consumption->current_value());
+        }
+        return st;
+    }
+
     Status check_limit(int64_t bytes = 0);
     bool is_overcommit_tracker() const { return type() == Type::QUERY || 
type() == Type::LOAD; }
 
@@ -222,9 +237,6 @@ public:
         return querytid;
     }
 
-    static std::string process_mem_log_str();
-    static std::string process_limit_exceeded_errmsg_str();
-    static std::string process_soft_limit_exceeded_errmsg_str();
     // Log the memory usage when memory limit is exceeded.
     std::string tracker_limit_exceeded_str();
 
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp 
b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
index bc962b51480..766ee643584 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
@@ -44,6 +44,7 @@ private:
 void ThreadMemTrackerMgr::attach_limiter_tracker(
         const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
     DCHECK(mem_tracker);
+    DCHECK(_reserved_mem == 0);
     CHECK(init());
     flush_untracked_mem();
     _limiter_tracker = mem_tracker;
@@ -53,6 +54,7 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(
 void ThreadMemTrackerMgr::detach_limiter_tracker(
         const std::shared_ptr<MemTrackerLimiter>& old_mem_tracker) {
     CHECK(init());
+    release_reserved();
     flush_untracked_mem();
     _limiter_tracker = old_mem_tracker;
     _limiter_tracker_raw = old_mem_tracker.get();
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h 
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 40fe6e13032..6081b013346 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -30,6 +30,7 @@
 
 #include "common/config.h"
 #include "runtime/exec_env.h"
+#include "runtime/memory/global_memory_arbitrator.h"
 #include "runtime/memory/mem_tracker.h"
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "util/stack_util.h"
@@ -37,6 +38,8 @@
 
 namespace doris {
 
+constexpr size_t SYNC_PROC_RESERVED_INTERVAL_BYTES = (1ULL << 20); // 1M
+
 // Memory Hook is counted in the memory tracker of the current thread.
 class ThreadMemTrackerMgr {
 public:
@@ -69,14 +72,14 @@ public:
 
     void start_count_scope_mem() {
         CHECK(init());
-        _scope_mem = 0;
+        _scope_mem = _reserved_mem; // consume in advance
         _count_scope_mem = true;
     }
 
     int64_t stop_count_scope_mem() {
         flush_untracked_mem();
         _count_scope_mem = false;
-        return _scope_mem;
+        return _scope_mem - _reserved_mem;
     }
 
     // Note that, If call the memory allocation operation in Memory Hook,
@@ -86,6 +89,9 @@ public:
     void consume(int64_t size, int skip_large_memory_check = 0);
     void flush_untracked_mem();
 
+    bool try_reserve(int64_t size);
+    void release_reserved();
+
     bool is_attach_query() { return _query_id != TUniqueId(); }
 
     bool is_query_cancelled() const { return _is_query_cancelled; }
@@ -123,7 +129,9 @@ private:
     bool _init = false;
     // Cache untracked mem.
     int64_t _untracked_mem = 0;
-    int64_t old_untracked_mem = 0;
+    int64_t _old_untracked_mem = 0;
+
+    int64_t _reserved_mem = 0;
 
     bool _count_scope_mem = false;
     int64_t _scope_mem = 0;
@@ -164,16 +172,43 @@ inline bool 
ThreadMemTrackerMgr::push_consumer_tracker(MemTracker* tracker) {
     }
     _consumer_tracker_stack.push_back(tracker);
     tracker->release(_untracked_mem);
+    tracker->consume(_reserved_mem); // consume in advance
     return true;
 }
 
 inline void ThreadMemTrackerMgr::pop_consumer_tracker() {
     DCHECK(!_consumer_tracker_stack.empty());
     _consumer_tracker_stack.back()->consume(_untracked_mem);
+    _consumer_tracker_stack.back()->release(_reserved_mem);
     _consumer_tracker_stack.pop_back();
 }
 
 inline void ThreadMemTrackerMgr::consume(int64_t size, int 
skip_large_memory_check) {
+    if (_reserved_mem != 0) {
+        if (_reserved_mem >= size) {
+            // only need to subtract _reserved_mem, no need to consume 
MemTracker,
+            // every time _reserved_mem is minus the sum of size >= 
SYNC_PROC_RESERVED_INTERVAL_BYTES,
+            // subtract size from process global reserved memory,
+            // because this part of the reserved memory has already been used 
by BE process.
+            _reserved_mem -= size;
+            // store bytes that not synchronized to process reserved memory.
+            _untracked_mem += size;
+            if (_untracked_mem >= SYNC_PROC_RESERVED_INTERVAL_BYTES) {
+                
doris::GlobalMemoryArbitrator::release_process_reserved_memory(_untracked_mem);
+                _untracked_mem = 0;
+            }
+            return;
+        } else {
+            // reserved memory is insufficient, the remaining _reserved_mem is 
subtracted from this memory consumed,
+            // and reset _reserved_mem to 0, and subtract the remaining 
_reserved_mem from
+            // process global reserved memory, this means that all reserved 
memory has been used by BE process.
+            size -= _reserved_mem;
+            
doris::GlobalMemoryArbitrator::release_process_reserved_memory(_reserved_mem +
+                                                                           
_untracked_mem);
+            _reserved_mem = 0;
+            _untracked_mem = 0;
+        }
+    }
     _untracked_mem += size;
     if (!_init && !ExecEnv::ready()) {
         return;
@@ -205,18 +240,64 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, 
int skip_large_memory_che
 inline void ThreadMemTrackerMgr::flush_untracked_mem() {
     // Temporary memory may be allocated during the consumption of the mem 
tracker, which will lead to entering
     // the Memory Hook again, so suspend consumption to avoid falling into an 
infinite loop.
-    if (!init()) return;
+    if (_untracked_mem == 0 || !init()) {
+        return;
+    }
     _stop_consume = true;
     DCHECK(_limiter_tracker_raw);
 
-    old_untracked_mem = _untracked_mem;
-    if (_count_scope_mem) _scope_mem += _untracked_mem;
-    _limiter_tracker_raw->consume(old_untracked_mem);
-    for (auto tracker : _consumer_tracker_stack) {
-        tracker->consume(old_untracked_mem);
+    _old_untracked_mem = _untracked_mem;
+    if (_count_scope_mem) {
+        _scope_mem += _untracked_mem;
     }
-    _untracked_mem -= old_untracked_mem;
+    _limiter_tracker_raw->consume(_old_untracked_mem);
+    for (auto* tracker : _consumer_tracker_stack) {
+        tracker->consume(_old_untracked_mem);
+    }
+    _untracked_mem -= _old_untracked_mem;
     _stop_consume = false;
 }
 
+inline bool ThreadMemTrackerMgr::try_reserve(int64_t size) {
+    DCHECK(_limiter_tracker_raw);
+    DCHECK(size >= 0);
+    CHECK(init());
+    // if _reserved_mem not equal to 0, repeat reserve,
+    // _untracked_mem store bytes that not synchronized to process reserved 
memory.
+    if (_reserved_mem == 0) {
+        flush_untracked_mem();
+    }
+    if (!_limiter_tracker_raw->try_consume(size)) {
+        return false;
+    }
+    if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) {
+        _limiter_tracker_raw->release(size); // rollback
+        return false;
+    }
+    if (_count_scope_mem) {
+        _scope_mem += size;
+    }
+    for (auto* tracker : _consumer_tracker_stack) {
+        tracker->consume(size);
+    }
+    _reserved_mem += size;
+    DCHECK(_untracked_mem == 0);
+    return true;
+}
+
+inline void ThreadMemTrackerMgr::release_reserved() {
+    flush_untracked_mem();
+    if (_reserved_mem > 0) {
+        
doris::GlobalMemoryArbitrator::release_process_reserved_memory(_reserved_mem);
+        _limiter_tracker_raw->consume(-_reserved_mem);
+        if (_count_scope_mem) {
+            _scope_mem -= _reserved_mem;
+        }
+        for (auto* tracker : _consumer_tracker_stack) {
+            tracker->consume(-_reserved_mem);
+        }
+        _reserved_mem = 0;
+    }
+}
+
 } // namespace doris
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 84b24e79657..72d3c8111f6 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -71,6 +71,9 @@
 #define SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(mem_tracker) \
     auto VARNAME_LINENUM(add_mem_consumer) = 
doris::AddThreadMemTrackerConsumerByHook(mem_tracker)
 
+#define DEFER_RELEASE_RESERVED() \
+    Defer VARNAME_LINENUM(defer) {[&]() { 
doris::thread_context()->release_reserved_memory(); }};
+
 #define ORPHAN_TRACKER_CHECK()                                                 
 \
     DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check 
|| \
            doris::thread_context()->thread_mem_tracker()->label() != "Orphan") 
 \
@@ -93,6 +96,7 @@
     auto VARNAME_LINENUM(scoped_tls_mcbh) = doris::ScopedInitThreadContext()
 #define SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(mem_tracker) \
     auto VARNAME_LINENUM(scoped_tls_cmtbh) = doris::ScopedInitThreadContext()
+#define DEFER_RELEASE_RESERVED() (void)0
 #define ORPHAN_TRACKER_CHECK() (void)0
 #define MEMORY_ORPHAN_CHECK() (void)0
 #endif
@@ -203,6 +207,24 @@ public:
         thread_mem_tracker_mgr->consume(size, skip_large_memory_check);
     }
 
+    bool try_reserve_memory(const int64_t size) const {
+#ifdef USE_MEM_TRACKER
+        DCHECK(doris::k_doris_exit || 
!doris::config::enable_memory_orphan_check ||
+               thread_mem_tracker()->label() != "Orphan")
+                << doris::memory_orphan_check_msg;
+#endif
+        return thread_mem_tracker_mgr->try_reserve(size);
+    }
+
+    void release_reserved_memory() const {
+#ifdef USE_MEM_TRACKER
+        DCHECK(doris::k_doris_exit || 
!doris::config::enable_memory_orphan_check ||
+               thread_mem_tracker()->label() != "Orphan")
+                << doris::memory_orphan_check_msg;
+#endif
+        thread_mem_tracker_mgr->release_reserved();
+    }
+
     int thread_local_handle_count = 0;
     int skip_memory_check = 0;
     int skip_large_memory_check = 0;
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index 39e411de726..fd885aaacb1 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -30,6 +30,7 @@
 #include "pipeline/task_queue.h"
 #include "pipeline/task_scheduler.h"
 #include "runtime/exec_env.h"
+#include "runtime/memory/global_memory_arbitrator.h"
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "util/mem_info.h"
 #include "util/parse_util.h"
@@ -171,7 +172,7 @@ int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, 
RuntimeProfile* profile,
                     MemTracker::print_bytes(_memory_limit), 
BackendOptions::get_localhost());
         }
     }
-    std::string process_mem_usage_str = 
MemTrackerLimiter::process_mem_log_str();
+    std::string process_mem_usage_str = 
GlobalMemoryArbitrator::process_mem_log_str();
     auto cancel_top_overcommit_str = [cancel_str, 
process_mem_usage_str](int64_t mem_consumption,
                                                                          const 
std::string& label) {
         return fmt::format(
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 153e9bab8ce..dc4c73782e4 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -177,29 +177,29 @@ void WorkloadGroupMgr::refresh_wg_memory_info() {
         wgs_mem_info[wg_id] = {wg_total_mem_used};
     }
 
+    // *TODO*, modify to use 
doris::GlobalMemoryArbitrator::process_memory_usage().
     auto proc_vm_rss = PerfCounters::get_vm_rss();
     if (all_queries_mem_used <= 0) {
         return;
     }
 
-    auto process_mem_used = doris::MemInfo::proc_mem_no_allocator_cache();
     if (proc_vm_rss < all_queries_mem_used) {
         all_queries_mem_used = proc_vm_rss;
     }
 
     // process memory used is actually bigger than all_queries_mem_used,
     // because memory of page cache, allocator cache, segment cache etc. are 
included
-    // in process_mem_used.
+    // in proc_vm_rss.
     // we count these cache memories equally on workload groups.
     double ratio = (double)proc_vm_rss / (double)all_queries_mem_used;
     if (ratio <= 1.25) {
-        auto sys_mem_available = doris::MemInfo::sys_mem_available();
         std::string debug_msg = fmt::format(
                 "\nProcess Memory Summary: process_vm_rss: {}, process mem: 
{}, sys mem available: "
                 "{}, all quries mem: {}",
                 PrettyPrinter::print(proc_vm_rss, TUnit::BYTES),
-                PrettyPrinter::print(process_mem_used, TUnit::BYTES),
-                PrettyPrinter::print(sys_mem_available, TUnit::BYTES),
+                
PrettyPrinter::print(doris::GlobalMemoryArbitrator::process_memory_usage(),
+                                     TUnit::BYTES),
+                doris::MemInfo::sys_mem_available_str(),
                 PrettyPrinter::print(all_queries_mem_used, TUnit::BYTES));
         LOG_EVERY_T(INFO, 10) << debug_msg;
     }
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index ce42d69dccf..a2cb04049db 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -56,9 +56,6 @@ namespace doris {
 
 bvar::PassiveStatus<int64_t> g_sys_mem_avail(
         "meminfo_sys_mem_avail", [](void*) { return 
MemInfo::sys_mem_available(); }, nullptr);
-bvar::PassiveStatus<int64_t> g_proc_mem_no_allocator_cache(
-        "meminfo_proc_mem_no_allocator_cache",
-        [](void*) { return MemInfo::proc_mem_no_allocator_cache(); }, nullptr);
 
 bool MemInfo::_s_initialized = false;
 std::atomic<int64_t> MemInfo::_s_physical_mem = 
std::numeric_limits<int64_t>::max();
@@ -68,7 +65,6 @@ std::atomic<int64_t> MemInfo::_s_soft_mem_limit = 
std::numeric_limits<int64_t>::
 std::atomic<int64_t> MemInfo::_s_allocator_cache_mem = 0;
 std::string MemInfo::_s_allocator_cache_mem_str = "";
 std::atomic<int64_t> MemInfo::_s_virtual_memory_used = 0;
-std::atomic<int64_t> MemInfo::_s_proc_mem_no_allocator_cache = -1;
 std::atomic<int64_t> MemInfo::refresh_interval_memory_growth = 0;
 
 static std::unordered_map<std::string, int64_t> _mem_info_bytes;
@@ -129,7 +125,7 @@ bool MemInfo::process_minor_gc() {
     std::string pre_sys_mem_available = MemInfo::sys_mem_available_str();
 
     Defer defer {[&]() {
-        notify_je_purge_dirty_pages();
+        MemInfo::notify_je_purge_dirty_pages();
         std::stringstream ss;
         profile->pretty_print(&ss);
         LOG(INFO) << fmt::format(
@@ -139,16 +135,16 @@ bool MemInfo::process_minor_gc() {
     }};
 
     freed_mem += 
CacheManager::instance()->for_each_cache_prune_stale(profile.get());
-    notify_je_purge_dirty_pages();
-    if (freed_mem > _s_process_minor_gc_size) {
+    MemInfo::notify_je_purge_dirty_pages();
+    if (freed_mem > MemInfo::process_minor_gc_size()) {
         return true;
     }
 
     if (config::enable_workload_group_memory_gc) {
         RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", 
true, true);
-        freed_mem += tg_enable_overcommit_group_gc(_s_process_minor_gc_size - 
freed_mem, tg_profile,
-                                                   true);
-        if (freed_mem > _s_process_minor_gc_size) {
+        freed_mem += 
tg_enable_overcommit_group_gc(MemInfo::process_minor_gc_size() - freed_mem,
+                                                   tg_profile, true);
+        if (freed_mem > MemInfo::process_minor_gc_size()) {
             return true;
         }
     }
@@ -160,9 +156,9 @@ bool MemInfo::process_minor_gc() {
         RuntimeProfile* toq_profile =
                 profile->create_child("FreeTopOvercommitMemoryQuery", true, 
true);
         freed_mem += MemTrackerLimiter::free_top_overcommit_query(
-                _s_process_minor_gc_size - freed_mem, pre_vm_rss, 
pre_sys_mem_available,
+                MemInfo::process_minor_gc_size() - freed_mem, pre_vm_rss, 
pre_sys_mem_available,
                 toq_profile);
-        if (freed_mem > _s_process_minor_gc_size) {
+        if (freed_mem > MemInfo::process_minor_gc_size()) {
             return true;
         }
     }
@@ -183,7 +179,7 @@ bool MemInfo::process_full_gc() {
     std::string pre_sys_mem_available = MemInfo::sys_mem_available_str();
 
     Defer defer {[&]() {
-        notify_je_purge_dirty_pages();
+        MemInfo::notify_je_purge_dirty_pages();
         std::stringstream ss;
         profile->pretty_print(&ss);
         LOG(INFO) << fmt::format(
@@ -193,16 +189,16 @@ bool MemInfo::process_full_gc() {
     }};
 
     freed_mem += 
CacheManager::instance()->for_each_cache_prune_all(profile.get());
-    notify_je_purge_dirty_pages();
-    if (freed_mem > _s_process_full_gc_size) {
+    MemInfo::notify_je_purge_dirty_pages();
+    if (freed_mem > MemInfo::process_full_gc_size()) {
         return true;
     }
 
     if (config::enable_workload_group_memory_gc) {
         RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup", 
true, true);
-        freed_mem += tg_enable_overcommit_group_gc(_s_process_full_gc_size - 
freed_mem, tg_profile,
-                                                   false);
-        if (freed_mem > _s_process_full_gc_size) {
+        freed_mem += 
tg_enable_overcommit_group_gc(MemInfo::process_full_gc_size() - freed_mem,
+                                                   tg_profile, false);
+        if (freed_mem > MemInfo::process_full_gc_size()) {
             return true;
         }
     }
@@ -211,8 +207,9 @@ bool MemInfo::process_full_gc() {
             "[MemoryGC] before free top memory query in full GC", 
MemTrackerLimiter::Type::QUERY);
     RuntimeProfile* tmq_profile = profile->create_child("FreeTopMemoryQuery", 
true, true);
     freed_mem += MemTrackerLimiter::free_top_memory_query(
-            _s_process_full_gc_size - freed_mem, pre_vm_rss, 
pre_sys_mem_available, tmq_profile);
-    if (freed_mem > _s_process_full_gc_size) {
+            MemInfo::process_full_gc_size() - freed_mem, pre_vm_rss, 
pre_sys_mem_available,
+            tmq_profile);
+    if (freed_mem > MemInfo::process_full_gc_size()) {
         return true;
     }
 
@@ -223,9 +220,9 @@ bool MemInfo::process_full_gc() {
         RuntimeProfile* tol_profile =
                 profile->create_child("FreeTopMemoryOvercommitLoad", true, 
true);
         freed_mem += MemTrackerLimiter::free_top_overcommit_load(
-                _s_process_full_gc_size - freed_mem, pre_vm_rss, 
pre_sys_mem_available,
+                MemInfo::process_full_gc_size() - freed_mem, pre_vm_rss, 
pre_sys_mem_available,
                 tol_profile);
-        if (freed_mem > _s_process_full_gc_size) {
+        if (freed_mem > MemInfo::process_full_gc_size()) {
             return true;
         }
     }
@@ -233,15 +230,13 @@ bool MemInfo::process_full_gc() {
     VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
             "[MemoryGC] before free top memory load in full GC", 
MemTrackerLimiter::Type::LOAD);
     RuntimeProfile* tml_profile = profile->create_child("FreeTopMemoryLoad", 
true, true);
-    freed_mem += MemTrackerLimiter::free_top_memory_load(
-            _s_process_full_gc_size - freed_mem, pre_vm_rss, 
pre_sys_mem_available, tml_profile);
-    if (freed_mem > _s_process_full_gc_size) {
-        return true;
-    }
-    return false;
+    freed_mem +=
+            
MemTrackerLimiter::free_top_memory_load(MemInfo::process_full_gc_size() - 
freed_mem,
+                                                    pre_vm_rss, 
pre_sys_mem_available, tml_profile);
+    return freed_mem > MemInfo::process_full_gc_size();
 }
 
-int64_t MemInfo::tg_not_enable_overcommit_group_gc() {
+int64_t MemInfo::tg_disable_overcommit_group_gc() {
     MonotonicStopWatch watch;
     watch.start();
     std::vector<WorkloadGroupPtr> task_groups;
diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h
index f6c9407d73e..5606ebd45d6 100644
--- a/be/src/util/mem_info.h
+++ b/be/src/util/mem_info.h
@@ -92,6 +92,12 @@ public:
     static inline int64_t sys_mem_available_warning_water_mark() {
         return _s_sys_mem_available_warning_water_mark;
     }
+    static inline int64_t process_minor_gc_size() {
+        return _s_process_minor_gc_size.load(std::memory_order_relaxed);
+    }
+    static inline int64_t process_full_gc_size() {
+        return _s_process_full_gc_size.load(std::memory_order_relaxed);
+    }
 
     static inline int64_t get_tc_metrics(const std::string& name) {
 #ifndef USE_JEMALLOC
@@ -152,30 +158,11 @@ public:
         return _s_allocator_cache_mem.load(std::memory_order_relaxed);
     }
     static inline std::string allocator_cache_mem_str() { return 
_s_allocator_cache_mem_str; }
-    static inline int64_t proc_mem_no_allocator_cache() {
-        return _s_proc_mem_no_allocator_cache.load(std::memory_order_relaxed) +
-               refresh_interval_memory_growth;
-    }
 
     // Tcmalloc property `generic.total_physical_bytes` records the total 
length of the virtual memory
     // obtained by the process malloc, not the physical memory actually used 
by the process in the OS.
     static void refresh_allocator_mem();
 
-    /** jemalloc pdirty is number of pages within unused extents that are 
potentially
-      * dirty, and for which madvise() or similar has not been called.
-      *
-      * So they will be subtracted from RSS to make accounting more
-      * accurate, since those pages are not really RSS but a memory
-      * that can be used at anytime via jemalloc.
-      */
-    static inline void refresh_proc_mem_no_allocator_cache() {
-        _s_proc_mem_no_allocator_cache.store(
-                PerfCounters::get_vm_rss() - 
static_cast<int64_t>(_s_allocator_cache_mem.load(
-                                                     
std::memory_order_relaxed)),
-                std::memory_order_relaxed);
-        refresh_interval_memory_growth = 0;
-    }
-
     static inline int64_t mem_limit() {
         DCHECK(_s_initialized);
         return _s_mem_limit.load(std::memory_order_relaxed);
@@ -193,17 +180,13 @@ public:
         return 
PrettyPrinter::print(_s_soft_mem_limit.load(std::memory_order_relaxed),
                                     TUnit::BYTES);
     }
-    static bool is_exceed_soft_mem_limit(int64_t bytes = 0) {
-        return proc_mem_no_allocator_cache() + bytes >= soft_mem_limit() ||
-               sys_mem_available() < sys_mem_available_warning_water_mark();
-    }
 
     static std::string debug_string();
 
     static bool process_minor_gc();
     static bool process_full_gc();
 
-    static int64_t tg_not_enable_overcommit_group_gc();
+    static int64_t tg_disable_overcommit_group_gc();
     static int64_t tg_enable_overcommit_group_gc(int64_t request_free_memory,
                                                  RuntimeProfile* profile, bool 
is_minor_gc);
 
@@ -220,7 +203,6 @@ private:
     static std::atomic<int64_t> _s_allocator_cache_mem;
     static std::string _s_allocator_cache_mem_str;
     static std::atomic<int64_t> _s_virtual_memory_used;
-    static std::atomic<int64_t> _s_proc_mem_no_allocator_cache;
 
     static std::atomic<int64_t> _s_sys_mem_available;
     static int64_t _s_sys_mem_available_low_water_mark;
diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp
index a72165f936c..f6758a2dbb9 100644
--- a/be/src/vec/common/allocator.cpp
+++ b/be/src/vec/common/allocator.cpp
@@ -28,6 +28,7 @@
 
 // Allocator is used by too many files. For compilation speed, put 
dependencies in `.cpp` as much as possible.
 #include "runtime/fragment_mgr.h"
+#include "runtime/memory/global_memory_arbitrator.h"
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/memory/thread_mem_tracker_mgr.h"
 #include "runtime/thread_context.h"
@@ -46,7 +47,7 @@ void Allocator<clear_memory_, mmap_populate, 
use_mmap>::sys_memory_check(size_t
     if (doris::thread_context()->skip_memory_check != 0) {
         return;
     }
-    if (doris::MemTrackerLimiter::sys_mem_exceed_limit_check(size)) {
+    if (doris::GlobalMemoryArbitrator::is_exceed_hard_mem_limit(size)) {
         // Only thread attach query, and has not completely waited for 
thread_wait_gc_max_milliseconds,
         // will wait for gc, asynchronous cancel or throw bad::alloc.
         // Otherwise, if the external catch, directly throw bad::alloc.
@@ -58,9 +59,9 @@ void Allocator<clear_memory_, mmap_populate, 
use_mmap>::sys_memory_check(size_t
                 
doris::thread_context()->thread_mem_tracker()->peak_consumption(),
                 doris::thread_context()->thread_mem_tracker()->consumption(),
                 
doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(),
-                doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str());
+                
doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str());
 
-        if (size > 1024l * 1024 * 1024 && 
!doris::enable_thread_catch_bad_alloc &&
+        if (size > 1024L * 1024 * 1024 && 
!doris::enable_thread_catch_bad_alloc &&
             !doris::config::disable_memory_gc) { // 1G
             err_msg += "\nAlloc Stacktrace:\n" + doris::get_stack_trace();
         }
@@ -83,7 +84,7 @@ void Allocator<clear_memory_, mmap_populate, 
use_mmap>::sys_memory_check(size_t
                     doris::config::thread_wait_gc_max_milliseconds, err_msg);
             while (wait_milliseconds < 
doris::config::thread_wait_gc_max_milliseconds) {
                 std::this_thread::sleep_for(std::chrono::milliseconds(100));
-                if 
(!doris::MemTrackerLimiter::sys_mem_exceed_limit_check(size)) {
+                if 
(!doris::GlobalMemoryArbitrator::is_exceed_hard_mem_limit(size)) {
                     doris::MemInfo::refresh_interval_memory_growth += size;
                     break;
                 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to