This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 2ed6a00fd1b [opt](memory) Add GlobalMemoryArbitrator and support
ReserveMemory (#34985) (#35070)
2ed6a00fd1b is described below
commit 2ed6a00fd1b6ec64b28c86a130d730f68eab1700
Author: Xinyi Zou <[email protected]>
AuthorDate: Wed May 22 09:53:45 2024 +0800
[opt](memory) Add GlobalMemoryArbitrator and support ReserveMemory (#34985)
(#35070)
---
be/src/agent/task_worker_pool.cpp | 3 +-
be/src/common/daemon.cpp | 25 ++--
be/src/olap/memtable_memory_limiter.cpp | 22 ++--
be/src/olap/olap_server.cpp | 5 +-
be/src/olap/rowset/segcompaction.cpp | 3 +-
be/src/olap/rowset_builder.cpp | 3 +-
be/src/runtime/memory/global_memory_arbitrator.cpp | 34 ++++++
be/src/runtime/memory/global_memory_arbitrator.h | 136 +++++++++++++++++++++
be/src/runtime/memory/mem_tracker.h | 8 +-
be/src/runtime/memory/mem_tracker_limiter.cpp | 51 +-------
be/src/runtime/memory/mem_tracker_limiter.h | 24 +++-
be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 2 +
be/src/runtime/memory/thread_mem_tracker_mgr.h | 101 +++++++++++++--
be/src/runtime/thread_context.h | 22 ++++
be/src/runtime/workload_group/workload_group.cpp | 3 +-
.../workload_group/workload_group_manager.cpp | 10 +-
be/src/util/mem_info.cpp | 53 ++++----
be/src/util/mem_info.h | 32 ++---
be/src/vec/common/allocator.cpp | 9 +-
19 files changed, 390 insertions(+), 156 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 30c83fdb878..37ec4bc6da7 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -73,6 +73,7 @@
#include "olap/txn_manager.h"
#include "olap/utils.h"
#include "runtime/exec_env.h"
+#include "runtime/memory/global_memory_arbitrator.h"
#include "runtime/snapshot_loader.h"
#include "service/backend_options.h"
#include "util/debug_points.h"
@@ -1563,7 +1564,7 @@ void
PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest&
.error(status);
} else {
if (!config::disable_auto_compaction &&
- !MemInfo::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
+
!GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
for (auto [tablet_id, _] : succ_tablets) {
TabletSharedPtr tablet =
_engine.tablet_manager()->get_tablet(tablet_id);
if (tablet != nullptr) {
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 8d79e6f2181..4787f603650 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -47,6 +47,7 @@
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
+#include "runtime/memory/global_memory_arbitrator.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/runtime_query_statistics_mgr.h"
@@ -191,7 +192,7 @@ void Daemon::memory_maintenance_thread() {
// Refresh process memory metrics.
doris::PerfCounters::refresh_proc_status();
doris::MemInfo::refresh_proc_meminfo();
- doris::MemInfo::refresh_proc_mem_no_allocator_cache();
+ doris::GlobalMemoryArbitrator::refresh_vm_rss_sub_allocator_cache();
// Update and print memory stat when the memory changes by 256M.
if (abs(last_print_proc_mem - PerfCounters::get_vm_rss()) > 268435456)
{
@@ -212,7 +213,7 @@ void Daemon::memory_maintenance_thread() {
ExecEnv::GetInstance()->brpc_iobuf_block_memory_tracker()->set_consumption(
butil::IOBuf::block_memory());
- LOG(INFO) << MemTrackerLimiter::
+ LOG(INFO) << doris::GlobalMemoryArbitrator::
process_mem_log_str(); // print mem log when
memory state by 256M
}
}
@@ -229,21 +230,22 @@ void Daemon::memory_gc_thread() {
continue;
}
auto sys_mem_available = doris::MemInfo::sys_mem_available();
- auto proc_mem_no_allocator_cache =
doris::MemInfo::proc_mem_no_allocator_cache();
+ auto process_memory_usage =
doris::GlobalMemoryArbitrator::process_memory_usage();
// GC excess memory for resource groups that not enable overcommit
- auto tg_free_mem = doris::MemInfo::tg_not_enable_overcommit_group_gc();
+ auto tg_free_mem = doris::MemInfo::tg_disable_overcommit_group_gc();
sys_mem_available += tg_free_mem;
- proc_mem_no_allocator_cache -= tg_free_mem;
+ process_memory_usage -= tg_free_mem;
if (memory_full_gc_sleep_time_ms <= 0 &&
(sys_mem_available <
doris::MemInfo::sys_mem_available_low_water_mark() ||
- proc_mem_no_allocator_cache >= doris::MemInfo::mem_limit())) {
+ process_memory_usage >= doris::MemInfo::mem_limit())) {
// No longer full gc and minor gc during sleep.
memory_full_gc_sleep_time_ms = memory_gc_sleep_time_ms;
memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms;
- LOG(INFO) << fmt::format("[MemoryGC] start full GC, {}.",
-
MemTrackerLimiter::process_limit_exceeded_errmsg_str());
+ LOG(INFO) << fmt::format(
+ "[MemoryGC] start full GC, {}.",
+
doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str());
doris::MemTrackerLimiter::print_log_process_usage();
if (doris::MemInfo::process_full_gc()) {
// If there is not enough memory to be gc, the process memory
usage will not be printed in the next continuous gc.
@@ -251,11 +253,12 @@ void Daemon::memory_gc_thread() {
}
} else if (memory_minor_gc_sleep_time_ms <= 0 &&
(sys_mem_available <
doris::MemInfo::sys_mem_available_warning_water_mark() ||
- proc_mem_no_allocator_cache >=
doris::MemInfo::soft_mem_limit())) {
+ process_memory_usage >= doris::MemInfo::soft_mem_limit()))
{
// No minor gc during sleep, but full gc is possible.
memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms;
- LOG(INFO) << fmt::format("[MemoryGC] start minor GC, {}.",
-
MemTrackerLimiter::process_soft_limit_exceeded_errmsg_str());
+ LOG(INFO) << fmt::format(
+ "[MemoryGC] start minor GC, {}.",
+
doris::GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str());
doris::MemTrackerLimiter::print_log_process_usage();
if (doris::MemInfo::process_minor_gc()) {
doris::MemTrackerLimiter::enable_print_log_process_usage();
diff --git a/be/src/olap/memtable_memory_limiter.cpp
b/be/src/olap/memtable_memory_limiter.cpp
index f0a46aaa452..dc128137ae4 100644
--- a/be/src/olap/memtable_memory_limiter.cpp
+++ b/be/src/olap/memtable_memory_limiter.cpp
@@ -88,7 +88,8 @@ int64_t MemTableMemoryLimiter::_avail_mem_lack() {
int64_t MemTableMemoryLimiter::_proc_mem_extra() {
// reserve a small amount of memory so we do not trigger MinorGC
auto reserved_mem = doris::MemInfo::sys_mem_available_low_water_mark();
- auto proc_mem_extra = MemInfo::proc_mem_no_allocator_cache() -
MemInfo::soft_mem_limit();
+ auto proc_mem_extra =
+ GlobalMemoryArbitrator::process_memory_usage() -
MemInfo::soft_mem_limit();
return proc_mem_extra + reserved_mem;
}
@@ -222,14 +223,17 @@ void MemTableMemoryLimiter::refresh_mem_tracker() {
_last_limit = limit;
_log_timer.reset();
- LOG(INFO) << ss.str() << ", process mem: " <<
PerfCounters::get_vm_rss_str()
- << " (without allocator cache: "
- <<
PrettyPrinter::print_bytes(MemInfo::proc_mem_no_allocator_cache())
- << "), load mem: " <<
PrettyPrinter::print_bytes(_mem_tracker->consumption())
- << ", memtable writers num: " << _writers.size()
- << " (active: " << PrettyPrinter::print_bytes(_active_mem_usage)
- << ", write: " << PrettyPrinter::print_bytes(_write_mem_usage)
- << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage)
<< ")";
+ // if not exist load task, this log should not be printed.
+ if (_mem_usage != 0) {
+ LOG(INFO) << ss.str() << ", process mem: " <<
PerfCounters::get_vm_rss_str()
+ << " (without allocator cache: "
+ <<
PrettyPrinter::print_bytes(GlobalMemoryArbitrator::process_memory_usage())
+ << "), load mem: " <<
PrettyPrinter::print_bytes(_mem_tracker->consumption())
+ << ", memtable writers num: " << _writers.size()
+ << " (active: " <<
PrettyPrinter::print_bytes(_active_mem_usage)
+ << ", write: " <<
PrettyPrinter::print_bytes(_write_mem_usage)
+ << ", flush: " <<
PrettyPrinter::print_bytes(_flush_mem_usage) << ")";
+ }
}
void MemTableMemoryLimiter::_refresh_mem_tracker() {
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 0a8bcedf14e..b667a804906 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -70,6 +70,7 @@
#include "olap/task/index_builder.h"
#include "runtime/client_cache.h"
#include "runtime/memory/cache_manager.h"
+#include "runtime/memory/global_memory_arbitrator.h"
#include "service/brpc.h"
#include "service/point_query_executor.h"
#include "util/brpc_client_cache.h"
@@ -621,7 +622,7 @@ void StorageEngine::_compaction_tasks_producer_callback() {
int64_t interval = config::generate_compaction_tasks_interval_ms;
do {
if (!config::disable_auto_compaction &&
- !MemInfo::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
+
!GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
_adjust_compaction_thread_num();
bool check_score = false;
@@ -1359,7 +1360,7 @@ void
StorageEngine::_cold_data_compaction_producer_callback() {
while (!_stop_background_threads_latch.wait_for(
std::chrono::seconds(config::cold_data_compaction_interval_sec))) {
if (config::disable_auto_compaction ||
- MemInfo::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
+
GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
continue;
}
diff --git a/be/src/olap/rowset/segcompaction.cpp
b/be/src/olap/rowset/segcompaction.cpp
index 2de881ee13a..8fee04ccb80 100644
--- a/be/src/olap/rowset/segcompaction.cpp
+++ b/be/src/olap/rowset/segcompaction.cpp
@@ -56,6 +56,7 @@
#include "olap/storage_engine.h"
#include "olap/tablet_reader.h"
#include "olap/tablet_schema.h"
+#include "runtime/memory/global_memory_arbitrator.h"
#include "runtime/thread_context.h"
#include "util/debug_points.h"
#include "util/mem_info.h"
@@ -212,7 +213,7 @@ Status
SegcompactionWorker::_create_segment_writer_for_segcompaction(
Status
SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPtr
segments) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->segcompaction_mem_tracker());
/* throttle segcompaction task if memory depleted */
- if (MemInfo::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
+ if (GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
return Status::Error<FETCH_MEMORY_EXCEEDED>("skip segcompaction due to
memory shortage");
}
diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp
index 2153a9ad1a8..7ff06b39eb0 100644
--- a/be/src/olap/rowset_builder.cpp
+++ b/be/src/olap/rowset_builder.cpp
@@ -48,6 +48,7 @@
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
#include "olap/txn_manager.h"
+#include "runtime/memory/global_memory_arbitrator.h"
#include "util/brpc_client_cache.h"
#include "util/mem_info.h"
#include "util/ref_count_closure.h"
@@ -140,7 +141,7 @@ Status
RowsetBuilder::init_mow_context(std::shared_ptr<MowContext>& mow_context)
Status RowsetBuilder::check_tablet_version_count() {
if (!_tablet->exceed_version_limit(config::max_tablet_version_num - 100) ||
- MemInfo::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
+ GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
return Status::OK();
}
//trigger compaction
diff --git a/be/src/runtime/memory/global_memory_arbitrator.cpp
b/be/src/runtime/memory/global_memory_arbitrator.cpp
new file mode 100644
index 00000000000..dc686f7c5ab
--- /dev/null
+++ b/be/src/runtime/memory/global_memory_arbitrator.cpp
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/memory/global_memory_arbitrator.h"
+
+#include <bvar/bvar.h>
+
+namespace doris {
+
+bvar::PassiveStatus<int64_t> g_vm_rss_sub_allocator_cache(
+ "meminfo_vm_rss_sub_allocator_cache",
+ [](void*) { return
GlobalMemoryArbitrator::vm_rss_sub_allocator_cache(); }, nullptr);
+bvar::PassiveStatus<int64_t> g_process_memory_usage(
+ "meminfo_process_memory_usage",
+ [](void*) { return GlobalMemoryArbitrator::process_memory_usage(); },
nullptr);
+
+std::atomic<int64_t> GlobalMemoryArbitrator::_s_vm_rss_sub_allocator_cache =
-1;
+std::atomic<int64_t> GlobalMemoryArbitrator::_s_process_reserved_memory = 0;
+
+} // namespace doris
diff --git a/be/src/runtime/memory/global_memory_arbitrator.h
b/be/src/runtime/memory/global_memory_arbitrator.h
new file mode 100644
index 00000000000..b1879cb1a7b
--- /dev/null
+++ b/be/src/runtime/memory/global_memory_arbitrator.h
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "util/mem_info.h"
+
+namespace doris {
+
+class GlobalMemoryArbitrator {
+public:
+ /** jemalloc pdirty is number of pages within unused extents that are
potentially
+ * dirty, and for which madvise() or similar has not been called.
+ *
+ * So they will be subtracted from RSS to make accounting more
+ * accurate, since those pages are not really RSS but a memory
+ * that can be used at anytime via jemalloc.
+ */
+ static inline void refresh_vm_rss_sub_allocator_cache() {
+ _s_vm_rss_sub_allocator_cache.store(
+ PerfCounters::get_vm_rss() -
static_cast<int64_t>(MemInfo::allocator_cache_mem()),
+ std::memory_order_relaxed);
+ MemInfo::refresh_interval_memory_growth = 0;
+ }
+ static inline int64_t vm_rss_sub_allocator_cache() {
+ return _s_vm_rss_sub_allocator_cache.load(std::memory_order_relaxed);
+ }
+
+ // If need to use process memory in your execution logic, pls use it.
+ // equal to real process memory(vm_rss), subtract jemalloc dirty page
cache,
+ // add reserved memory and growth memory since the last vm_rss update.
+ static inline int64_t process_memory_usage() {
+ return vm_rss_sub_allocator_cache() +
+
MemInfo::refresh_interval_memory_growth.load(std::memory_order_relaxed) +
+ process_reserved_memory();
+ }
+
+ static inline bool try_reserve_process_memory(int64_t bytes) {
+ if (MemInfo::sys_mem_available() - bytes <
MemInfo::sys_mem_available_low_water_mark()) {
+ return false;
+ }
+ int64_t old_reserved_mem =
_s_process_reserved_memory.load(std::memory_order_relaxed);
+ int64_t new_reserved_mem = 0;
+ do {
+ new_reserved_mem = old_reserved_mem + bytes;
+ if (UNLIKELY(vm_rss_sub_allocator_cache() +
+ MemInfo::refresh_interval_memory_growth.load(
+ std::memory_order_relaxed) +
+ new_reserved_mem >=
+ MemInfo::mem_limit())) {
+ return false;
+ }
+ } while (!_s_process_reserved_memory.compare_exchange_weak(
+ old_reserved_mem, new_reserved_mem,
std::memory_order_relaxed));
+ return true;
+ }
+
+ static inline void release_process_reserved_memory(int64_t bytes) {
+ _s_process_reserved_memory.fetch_sub(bytes, std::memory_order_relaxed);
+ }
+
+ static inline int64_t process_reserved_memory() {
+ return _s_process_reserved_memory.load(std::memory_order_relaxed);
+ }
+
+ static bool is_exceed_soft_mem_limit(int64_t bytes = 0) {
+ return process_memory_usage() + bytes >= MemInfo::soft_mem_limit() ||
+ MemInfo::sys_mem_available() - bytes <
+ MemInfo::sys_mem_available_warning_water_mark();
+ }
+
+ static bool is_exceed_hard_mem_limit(int64_t bytes = 0) {
+ // Limit process memory usage using the actual physical memory of the
process in `/proc/self/status`.
+ // This is independent of the consumption value of the mem tracker,
which counts the virtual memory
+ // of the process malloc.
+ // for fast, expect MemInfo::initialized() to be true.
+ //
+ // tcmalloc/jemalloc allocator cache does not participate in the mem
check as part of the process physical memory.
+ // because `new/malloc` will trigger mem hook when using
tcmalloc/jemalloc allocator cache,
+ // but it may not actually alloc physical memory, which is not
expected in mem hook fail.
+ return process_memory_usage() + bytes >= MemInfo::mem_limit() ||
+ MemInfo::sys_mem_available() - bytes <
MemInfo::sys_mem_available_low_water_mark();
+ }
+
+ static std::string process_mem_log_str() {
+ return fmt::format(
+ "os physical memory {}. process memory used {}, limit {}, soft
limit {}. sys "
+ "available memory {}, low water mark {}, warning water mark
{}. Refresh interval "
+ "memory growth {} B",
+ PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES),
+ PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(),
+ MemInfo::soft_mem_limit_str(),
MemInfo::sys_mem_available_str(),
+
PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES),
+
PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(),
TUnit::BYTES),
+ MemInfo::refresh_interval_memory_growth);
+ }
+
+ static std::string process_limit_exceeded_errmsg_str() {
+ return fmt::format(
+ "process memory used {} exceed limit {} or sys available
memory {} less than low "
+ "water mark {}",
+ PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(),
+ MemInfo::sys_mem_available_str(),
+
PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(),
TUnit::BYTES));
+ }
+
+ static std::string process_soft_limit_exceeded_errmsg_str() {
+ return fmt::format(
+ "process memory used {} exceed soft limit {} or sys available
memory {} less than "
+ "warning water mark {}.",
+ PerfCounters::get_vm_rss_str(), MemInfo::soft_mem_limit_str(),
+ MemInfo::sys_mem_available_str(),
+
PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(),
+ TUnit::BYTES));
+ }
+
+private:
+ static std::atomic<int64_t> _s_vm_rss_sub_allocator_cache;
+ static std::atomic<int64_t> _s_process_reserved_memory;
+};
+
+} // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker.h
b/be/src/runtime/memory/mem_tracker.h
index de7628c1749..d308d201901 100644
--- a/be/src/runtime/memory/mem_tracker.h
+++ b/be/src/runtime/memory/mem_tracker.h
@@ -70,7 +70,7 @@ public:
MemCounter() : _current_value(0), _peak_value(0) {}
void add(int64_t delta) {
- auto value = _current_value.fetch_add(delta,
std::memory_order_relaxed) + delta;
+ int64_t value = _current_value.fetch_add(delta,
std::memory_order_relaxed) + delta;
update_peak(value);
}
@@ -79,8 +79,8 @@ public:
}
bool try_add(int64_t delta, int64_t max) {
- auto cur_val = _current_value.load(std::memory_order_relaxed);
- auto new_val = 0;
+ int64_t cur_val = _current_value.load(std::memory_order_relaxed);
+ int64_t new_val = 0;
do {
new_val = cur_val + delta;
if (UNLIKELY(new_val > max)) {
@@ -100,7 +100,7 @@ public:
}
void update_peak(int64_t value) {
- auto pre_value = _peak_value.load(std::memory_order_relaxed);
+ int64_t pre_value = _peak_value.load(std::memory_order_relaxed);
while (value > pre_value && !_peak_value.compare_exchange_weak(
pre_value, value,
std::memory_order_relaxed)) {
}
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 2218bed6959..b84f7c54957 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -31,6 +31,7 @@
#include "olap/memtable_memory_limiter.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
+#include "runtime/memory/global_memory_arbitrator.h"
#include "runtime/thread_context.h"
#include "runtime/workload_group/workload_group.h"
#include "service/backend_options.h"
@@ -306,7 +307,7 @@ void MemTrackerLimiter::print_log_usage(const std::string&
msg) {
if (_enable_print_log_usage) {
_enable_print_log_usage = false;
std::string detail = msg;
- detail += "\nProcess Memory Summary:\n " +
MemTrackerLimiter::process_mem_log_str();
+ detail += "\nProcess Memory Summary:\n " +
GlobalMemoryArbitrator::process_mem_log_str();
detail += "\nMemory Tracker Summary: " + log_usage();
std::string child_trackers_usage;
std::vector<MemTracker::Snapshot> snapshots;
@@ -324,7 +325,7 @@ void MemTrackerLimiter::print_log_usage(const std::string&
msg) {
std::string MemTrackerLimiter::log_process_usage_str() {
std::string detail;
- detail += "\nProcess Memory Summary:\n " +
MemTrackerLimiter::process_mem_log_str();
+ detail += "\nProcess Memory Summary:\n " +
GlobalMemoryArbitrator::process_mem_log_str();
std::vector<MemTracker::Snapshot> snapshots;
MemTrackerLimiter::make_process_snapshots(&snapshots);
MemTrackerLimiter::make_type_snapshots(&snapshots,
MemTrackerLimiter::Type::GLOBAL);
@@ -355,50 +356,6 @@ void MemTrackerLimiter::print_log_process_usage() {
}
}
-bool MemTrackerLimiter::sys_mem_exceed_limit_check(int64_t bytes) {
- // Limit process memory usage using the actual physical memory of the
process in `/proc/self/status`.
- // This is independent of the consumption value of the mem tracker, which
counts the virtual memory
- // of the process malloc.
- // for fast, expect MemInfo::initialized() to be true.
- //
- // tcmalloc/jemalloc allocator cache does not participate in the mem check
as part of the process physical memory.
- // because `new/malloc` will trigger mem hook when using tcmalloc/jemalloc
allocator cache,
- // but it may not actually alloc physical memory, which is not expected in
mem hook fail.
- return MemInfo::proc_mem_no_allocator_cache() + bytes >=
MemInfo::mem_limit() ||
- MemInfo::sys_mem_available() <
MemInfo::sys_mem_available_low_water_mark();
-}
-
-std::string MemTrackerLimiter::process_mem_log_str() {
- return fmt::format(
- "os physical memory {}. process memory used {}, limit {}, soft
limit {}. sys "
- "available memory {}, low water mark {}, warning water mark {}.
Refresh interval "
- "memory growth {} B",
- PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES),
- PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(),
MemInfo::soft_mem_limit_str(),
- MemInfo::sys_mem_available_str(),
- PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(),
TUnit::BYTES),
-
PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(),
TUnit::BYTES),
- MemInfo::refresh_interval_memory_growth);
-}
-
-std::string MemTrackerLimiter::process_limit_exceeded_errmsg_str() {
- return fmt::format(
- "process memory used {} exceed limit {} or sys available memory {}
less than low "
- "water mark {}",
- PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(),
- MemInfo::sys_mem_available_str(),
- PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(),
TUnit::BYTES));
-}
-
-std::string MemTrackerLimiter::process_soft_limit_exceeded_errmsg_str() {
- return fmt::format(
- "process memory used {} exceed soft limit {} or sys available
memory {} less than "
- "warning water mark {}.",
- PerfCounters::get_vm_rss_str(), MemInfo::soft_mem_limit_str(),
- MemInfo::sys_mem_available_str(),
-
PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(),
TUnit::BYTES));
-}
-
std::string MemTrackerLimiter::tracker_limit_exceeded_str() {
std::string err_msg = fmt::format(
"memory tracker limit exceeded, tracker label:{}, type:{}, limit "
@@ -490,7 +447,7 @@ int64_t MemTrackerLimiter::free_top_memory_query(
} else if (tracker->consumption() + prepare_free_mem <
min_free_mem) {
min_pq.emplace(tracker->consumption(),
tracker->label());
prepare_free_mem += tracker->consumption();
- } else if (tracker->consumption() > min_pq.top().first) {
+ } else if (!min_pq.empty() && tracker->consumption() >
min_pq.top().first) {
min_pq.emplace(tracker->consumption(),
tracker->label());
prepare_free_mem += tracker->consumption();
while (prepare_free_mem - min_pq.top().first >
min_free_mem) {
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h
b/be/src/runtime/memory/mem_tracker_limiter.h
index 2e510e1f462..3a891ca3a14 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -43,7 +43,7 @@ namespace doris {
class RuntimeProfile;
-constexpr auto MEM_TRACKER_GROUP_NUM = 1000;
+constexpr size_t MEM_TRACKER_GROUP_NUM = 1000;
struct TrackerLimiterGroup {
// Note! in order to enable ExecEnv::mem_tracker_limiter_pool support
resize,
@@ -129,8 +129,6 @@ public:
__builtin_unreachable();
}
- static bool sys_mem_exceed_limit_check(int64_t bytes);
-
void set_consumption() { LOG(FATAL) << "MemTrackerLimiter set_consumption
not supported"; }
Type type() const { return _type; }
int64_t group_num() const { return _group_num; }
@@ -138,6 +136,23 @@ public:
int64_t limit() const { return _limit; }
bool limit_exceeded() const { return _limit >= 0 && _limit <
consumption(); }
+ bool try_consume(int64_t bytes) const {
+ if (UNLIKELY(bytes == 0)) {
+ return true;
+ }
+ bool st = true;
+ if (is_overcommit_tracker() && config::enable_query_memory_overcommit)
{
+ st = _consumption->try_add(bytes, _limit);
+ } else {
+ _consumption->add(bytes);
+ }
+ if (st && _query_statistics) {
+
_query_statistics->set_max_peak_memory_bytes(_consumption->peak_value());
+
_query_statistics->set_current_used_memory_bytes(_consumption->current_value());
+ }
+ return st;
+ }
+
Status check_limit(int64_t bytes = 0);
bool is_overcommit_tracker() const { return type() == Type::QUERY ||
type() == Type::LOAD; }
@@ -222,9 +237,6 @@ public:
return querytid;
}
- static std::string process_mem_log_str();
- static std::string process_limit_exceeded_errmsg_str();
- static std::string process_soft_limit_exceeded_errmsg_str();
// Log the memory usage when memory limit is exceeded.
std::string tracker_limit_exceeded_str();
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
index bc962b51480..766ee643584 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
@@ -44,6 +44,7 @@ private:
void ThreadMemTrackerMgr::attach_limiter_tracker(
const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
DCHECK(mem_tracker);
+ DCHECK(_reserved_mem == 0);
CHECK(init());
flush_untracked_mem();
_limiter_tracker = mem_tracker;
@@ -53,6 +54,7 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(
void ThreadMemTrackerMgr::detach_limiter_tracker(
const std::shared_ptr<MemTrackerLimiter>& old_mem_tracker) {
CHECK(init());
+ release_reserved();
flush_untracked_mem();
_limiter_tracker = old_mem_tracker;
_limiter_tracker_raw = old_mem_tracker.get();
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 40fe6e13032..6081b013346 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -30,6 +30,7 @@
#include "common/config.h"
#include "runtime/exec_env.h"
+#include "runtime/memory/global_memory_arbitrator.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "util/stack_util.h"
@@ -37,6 +38,8 @@
namespace doris {
+constexpr size_t SYNC_PROC_RESERVED_INTERVAL_BYTES = (1ULL << 20); // 1M
+
// Memory Hook is counted in the memory tracker of the current thread.
class ThreadMemTrackerMgr {
public:
@@ -69,14 +72,14 @@ public:
void start_count_scope_mem() {
CHECK(init());
- _scope_mem = 0;
+ _scope_mem = _reserved_mem; // consume in advance
_count_scope_mem = true;
}
int64_t stop_count_scope_mem() {
flush_untracked_mem();
_count_scope_mem = false;
- return _scope_mem;
+ return _scope_mem - _reserved_mem;
}
// Note that, If call the memory allocation operation in Memory Hook,
@@ -86,6 +89,9 @@ public:
void consume(int64_t size, int skip_large_memory_check = 0);
void flush_untracked_mem();
+ bool try_reserve(int64_t size);
+ void release_reserved();
+
bool is_attach_query() { return _query_id != TUniqueId(); }
bool is_query_cancelled() const { return _is_query_cancelled; }
@@ -123,7 +129,9 @@ private:
bool _init = false;
// Cache untracked mem.
int64_t _untracked_mem = 0;
- int64_t old_untracked_mem = 0;
+ int64_t _old_untracked_mem = 0;
+
+ int64_t _reserved_mem = 0;
bool _count_scope_mem = false;
int64_t _scope_mem = 0;
@@ -164,16 +172,43 @@ inline bool
ThreadMemTrackerMgr::push_consumer_tracker(MemTracker* tracker) {
}
_consumer_tracker_stack.push_back(tracker);
tracker->release(_untracked_mem);
+ tracker->consume(_reserved_mem); // consume in advance
return true;
}
inline void ThreadMemTrackerMgr::pop_consumer_tracker() {
DCHECK(!_consumer_tracker_stack.empty());
_consumer_tracker_stack.back()->consume(_untracked_mem);
+ _consumer_tracker_stack.back()->release(_reserved_mem);
_consumer_tracker_stack.pop_back();
}
inline void ThreadMemTrackerMgr::consume(int64_t size, int
skip_large_memory_check) {
+ if (_reserved_mem != 0) {
+ if (_reserved_mem >= size) {
+ // only need to subtract _reserved_mem, no need to consume
MemTracker,
+ // every time _reserved_mem is minus the sum of size >=
SYNC_PROC_RESERVED_INTERVAL_BYTES,
+ // subtract size from process global reserved memory,
+ // because this part of the reserved memory has already been used
by BE process.
+ _reserved_mem -= size;
+ // store bytes that not synchronized to process reserved memory.
+ _untracked_mem += size;
+ if (_untracked_mem >= SYNC_PROC_RESERVED_INTERVAL_BYTES) {
+
doris::GlobalMemoryArbitrator::release_process_reserved_memory(_untracked_mem);
+ _untracked_mem = 0;
+ }
+ return;
+ } else {
+ // reserved memory is insufficient, the remaining _reserved_mem is
subtracted from this memory consumed,
+ // and reset _reserved_mem to 0, and subtract the remaining
_reserved_mem from
+ // process global reserved memory, this means that all reserved
memory has been used by BE process.
+ size -= _reserved_mem;
+
doris::GlobalMemoryArbitrator::release_process_reserved_memory(_reserved_mem +
+
_untracked_mem);
+ _reserved_mem = 0;
+ _untracked_mem = 0;
+ }
+ }
_untracked_mem += size;
if (!_init && !ExecEnv::ready()) {
return;
@@ -205,18 +240,64 @@ inline void ThreadMemTrackerMgr::consume(int64_t size,
int skip_large_memory_che
inline void ThreadMemTrackerMgr::flush_untracked_mem() {
// Temporary memory may be allocated during the consumption of the mem
tracker, which will lead to entering
// the Memory Hook again, so suspend consumption to avoid falling into an
infinite loop.
- if (!init()) return;
+ if (_untracked_mem == 0 || !init()) {
+ return;
+ }
_stop_consume = true;
DCHECK(_limiter_tracker_raw);
- old_untracked_mem = _untracked_mem;
- if (_count_scope_mem) _scope_mem += _untracked_mem;
- _limiter_tracker_raw->consume(old_untracked_mem);
- for (auto tracker : _consumer_tracker_stack) {
- tracker->consume(old_untracked_mem);
+ _old_untracked_mem = _untracked_mem;
+ if (_count_scope_mem) {
+ _scope_mem += _untracked_mem;
}
- _untracked_mem -= old_untracked_mem;
+ _limiter_tracker_raw->consume(_old_untracked_mem);
+ for (auto* tracker : _consumer_tracker_stack) {
+ tracker->consume(_old_untracked_mem);
+ }
+ _untracked_mem -= _old_untracked_mem;
_stop_consume = false;
}
+inline bool ThreadMemTrackerMgr::try_reserve(int64_t size) {
+ DCHECK(_limiter_tracker_raw);
+ DCHECK(size >= 0);
+ CHECK(init());
+ // if _reserved_mem not equal to 0, repeat reserve,
+ // _untracked_mem store bytes that not synchronized to process reserved
memory.
+ if (_reserved_mem == 0) {
+ flush_untracked_mem();
+ }
+ if (!_limiter_tracker_raw->try_consume(size)) {
+ return false;
+ }
+ if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) {
+ _limiter_tracker_raw->release(size); // rollback
+ return false;
+ }
+ if (_count_scope_mem) {
+ _scope_mem += size;
+ }
+ for (auto* tracker : _consumer_tracker_stack) {
+ tracker->consume(size);
+ }
+ _reserved_mem += size;
+ DCHECK(_untracked_mem == 0);
+ return true;
+}
+
+inline void ThreadMemTrackerMgr::release_reserved() {
+ flush_untracked_mem();
+ if (_reserved_mem > 0) {
+
doris::GlobalMemoryArbitrator::release_process_reserved_memory(_reserved_mem);
+ _limiter_tracker_raw->consume(-_reserved_mem);
+ if (_count_scope_mem) {
+ _scope_mem -= _reserved_mem;
+ }
+ for (auto* tracker : _consumer_tracker_stack) {
+ tracker->consume(-_reserved_mem);
+ }
+ _reserved_mem = 0;
+ }
+}
+
} // namespace doris
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 84b24e79657..72d3c8111f6 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -71,6 +71,9 @@
#define SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(mem_tracker) \
auto VARNAME_LINENUM(add_mem_consumer) =
doris::AddThreadMemTrackerConsumerByHook(mem_tracker)
+#define DEFER_RELEASE_RESERVED() \
+ Defer VARNAME_LINENUM(defer) {[&]() {
doris::thread_context()->release_reserved_memory(); }};
+
#define ORPHAN_TRACKER_CHECK()
\
DCHECK(doris::k_doris_exit || !doris::config::enable_memory_orphan_check
|| \
doris::thread_context()->thread_mem_tracker()->label() != "Orphan")
\
@@ -93,6 +96,7 @@
auto VARNAME_LINENUM(scoped_tls_mcbh) = doris::ScopedInitThreadContext()
#define SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(mem_tracker) \
auto VARNAME_LINENUM(scoped_tls_cmtbh) = doris::ScopedInitThreadContext()
+#define DEFER_RELEASE_RESERVED() (void)0
#define ORPHAN_TRACKER_CHECK() (void)0
#define MEMORY_ORPHAN_CHECK() (void)0
#endif
@@ -203,6 +207,24 @@ public:
thread_mem_tracker_mgr->consume(size, skip_large_memory_check);
}
+ bool try_reserve_memory(const int64_t size) const {
+#ifdef USE_MEM_TRACKER
+ DCHECK(doris::k_doris_exit ||
!doris::config::enable_memory_orphan_check ||
+ thread_mem_tracker()->label() != "Orphan")
+ << doris::memory_orphan_check_msg;
+#endif
+ return thread_mem_tracker_mgr->try_reserve(size);
+ }
+
+ void release_reserved_memory() const {
+#ifdef USE_MEM_TRACKER
+ DCHECK(doris::k_doris_exit ||
!doris::config::enable_memory_orphan_check ||
+ thread_mem_tracker()->label() != "Orphan")
+ << doris::memory_orphan_check_msg;
+#endif
+ thread_mem_tracker_mgr->release_reserved();
+ }
+
int thread_local_handle_count = 0;
int skip_memory_check = 0;
int skip_large_memory_check = 0;
diff --git a/be/src/runtime/workload_group/workload_group.cpp
b/be/src/runtime/workload_group/workload_group.cpp
index 39e411de726..fd885aaacb1 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -30,6 +30,7 @@
#include "pipeline/task_queue.h"
#include "pipeline/task_scheduler.h"
#include "runtime/exec_env.h"
+#include "runtime/memory/global_memory_arbitrator.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "util/mem_info.h"
#include "util/parse_util.h"
@@ -171,7 +172,7 @@ int64_t WorkloadGroup::gc_memory(int64_t need_free_mem,
RuntimeProfile* profile,
MemTracker::print_bytes(_memory_limit),
BackendOptions::get_localhost());
}
}
- std::string process_mem_usage_str =
MemTrackerLimiter::process_mem_log_str();
+ std::string process_mem_usage_str =
GlobalMemoryArbitrator::process_mem_log_str();
auto cancel_top_overcommit_str = [cancel_str,
process_mem_usage_str](int64_t mem_consumption,
const
std::string& label) {
return fmt::format(
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 153e9bab8ce..dc4c73782e4 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -177,29 +177,29 @@ void WorkloadGroupMgr::refresh_wg_memory_info() {
wgs_mem_info[wg_id] = {wg_total_mem_used};
}
+ // *TODO*, modify to use
doris::GlobalMemoryArbitrator::process_memory_usage().
auto proc_vm_rss = PerfCounters::get_vm_rss();
if (all_queries_mem_used <= 0) {
return;
}
- auto process_mem_used = doris::MemInfo::proc_mem_no_allocator_cache();
if (proc_vm_rss < all_queries_mem_used) {
all_queries_mem_used = proc_vm_rss;
}
// process memory used is actually bigger than all_queries_mem_used,
// because memory of page cache, allocator cache, segment cache etc. are
included
- // in process_mem_used.
+ // in proc_vm_rss.
// we count these cache memories equally on workload groups.
double ratio = (double)proc_vm_rss / (double)all_queries_mem_used;
if (ratio <= 1.25) {
- auto sys_mem_available = doris::MemInfo::sys_mem_available();
std::string debug_msg = fmt::format(
"\nProcess Memory Summary: process_vm_rss: {}, process mem:
{}, sys mem available: "
"{}, all quries mem: {}",
PrettyPrinter::print(proc_vm_rss, TUnit::BYTES),
- PrettyPrinter::print(process_mem_used, TUnit::BYTES),
- PrettyPrinter::print(sys_mem_available, TUnit::BYTES),
+
PrettyPrinter::print(doris::GlobalMemoryArbitrator::process_memory_usage(),
+ TUnit::BYTES),
+ doris::MemInfo::sys_mem_available_str(),
PrettyPrinter::print(all_queries_mem_used, TUnit::BYTES));
LOG_EVERY_T(INFO, 10) << debug_msg;
}
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index ce42d69dccf..a2cb04049db 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -56,9 +56,6 @@ namespace doris {
bvar::PassiveStatus<int64_t> g_sys_mem_avail(
"meminfo_sys_mem_avail", [](void*) { return
MemInfo::sys_mem_available(); }, nullptr);
-bvar::PassiveStatus<int64_t> g_proc_mem_no_allocator_cache(
- "meminfo_proc_mem_no_allocator_cache",
- [](void*) { return MemInfo::proc_mem_no_allocator_cache(); }, nullptr);
bool MemInfo::_s_initialized = false;
std::atomic<int64_t> MemInfo::_s_physical_mem =
std::numeric_limits<int64_t>::max();
@@ -68,7 +65,6 @@ std::atomic<int64_t> MemInfo::_s_soft_mem_limit =
std::numeric_limits<int64_t>::
std::atomic<int64_t> MemInfo::_s_allocator_cache_mem = 0;
std::string MemInfo::_s_allocator_cache_mem_str = "";
std::atomic<int64_t> MemInfo::_s_virtual_memory_used = 0;
-std::atomic<int64_t> MemInfo::_s_proc_mem_no_allocator_cache = -1;
std::atomic<int64_t> MemInfo::refresh_interval_memory_growth = 0;
static std::unordered_map<std::string, int64_t> _mem_info_bytes;
@@ -129,7 +125,7 @@ bool MemInfo::process_minor_gc() {
std::string pre_sys_mem_available = MemInfo::sys_mem_available_str();
Defer defer {[&]() {
- notify_je_purge_dirty_pages();
+ MemInfo::notify_je_purge_dirty_pages();
std::stringstream ss;
profile->pretty_print(&ss);
LOG(INFO) << fmt::format(
@@ -139,16 +135,16 @@ bool MemInfo::process_minor_gc() {
}};
freed_mem +=
CacheManager::instance()->for_each_cache_prune_stale(profile.get());
- notify_je_purge_dirty_pages();
- if (freed_mem > _s_process_minor_gc_size) {
+ MemInfo::notify_je_purge_dirty_pages();
+ if (freed_mem > MemInfo::process_minor_gc_size()) {
return true;
}
if (config::enable_workload_group_memory_gc) {
RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup",
true, true);
- freed_mem += tg_enable_overcommit_group_gc(_s_process_minor_gc_size -
freed_mem, tg_profile,
- true);
- if (freed_mem > _s_process_minor_gc_size) {
+ freed_mem +=
tg_enable_overcommit_group_gc(MemInfo::process_minor_gc_size() - freed_mem,
+ tg_profile, true);
+ if (freed_mem > MemInfo::process_minor_gc_size()) {
return true;
}
}
@@ -160,9 +156,9 @@ bool MemInfo::process_minor_gc() {
RuntimeProfile* toq_profile =
profile->create_child("FreeTopOvercommitMemoryQuery", true,
true);
freed_mem += MemTrackerLimiter::free_top_overcommit_query(
- _s_process_minor_gc_size - freed_mem, pre_vm_rss,
pre_sys_mem_available,
+ MemInfo::process_minor_gc_size() - freed_mem, pre_vm_rss,
pre_sys_mem_available,
toq_profile);
- if (freed_mem > _s_process_minor_gc_size) {
+ if (freed_mem > MemInfo::process_minor_gc_size()) {
return true;
}
}
@@ -183,7 +179,7 @@ bool MemInfo::process_full_gc() {
std::string pre_sys_mem_available = MemInfo::sys_mem_available_str();
Defer defer {[&]() {
- notify_je_purge_dirty_pages();
+ MemInfo::notify_je_purge_dirty_pages();
std::stringstream ss;
profile->pretty_print(&ss);
LOG(INFO) << fmt::format(
@@ -193,16 +189,16 @@ bool MemInfo::process_full_gc() {
}};
freed_mem +=
CacheManager::instance()->for_each_cache_prune_all(profile.get());
- notify_je_purge_dirty_pages();
- if (freed_mem > _s_process_full_gc_size) {
+ MemInfo::notify_je_purge_dirty_pages();
+ if (freed_mem > MemInfo::process_full_gc_size()) {
return true;
}
if (config::enable_workload_group_memory_gc) {
RuntimeProfile* tg_profile = profile->create_child("WorkloadGroup",
true, true);
- freed_mem += tg_enable_overcommit_group_gc(_s_process_full_gc_size -
freed_mem, tg_profile,
- false);
- if (freed_mem > _s_process_full_gc_size) {
+ freed_mem +=
tg_enable_overcommit_group_gc(MemInfo::process_full_gc_size() - freed_mem,
+ tg_profile, false);
+ if (freed_mem > MemInfo::process_full_gc_size()) {
return true;
}
}
@@ -211,8 +207,9 @@ bool MemInfo::process_full_gc() {
"[MemoryGC] before free top memory query in full GC",
MemTrackerLimiter::Type::QUERY);
RuntimeProfile* tmq_profile = profile->create_child("FreeTopMemoryQuery",
true, true);
freed_mem += MemTrackerLimiter::free_top_memory_query(
- _s_process_full_gc_size - freed_mem, pre_vm_rss,
pre_sys_mem_available, tmq_profile);
- if (freed_mem > _s_process_full_gc_size) {
+ MemInfo::process_full_gc_size() - freed_mem, pre_vm_rss,
pre_sys_mem_available,
+ tmq_profile);
+ if (freed_mem > MemInfo::process_full_gc_size()) {
return true;
}
@@ -223,9 +220,9 @@ bool MemInfo::process_full_gc() {
RuntimeProfile* tol_profile =
profile->create_child("FreeTopMemoryOvercommitLoad", true,
true);
freed_mem += MemTrackerLimiter::free_top_overcommit_load(
- _s_process_full_gc_size - freed_mem, pre_vm_rss,
pre_sys_mem_available,
+ MemInfo::process_full_gc_size() - freed_mem, pre_vm_rss,
pre_sys_mem_available,
tol_profile);
- if (freed_mem > _s_process_full_gc_size) {
+ if (freed_mem > MemInfo::process_full_gc_size()) {
return true;
}
}
@@ -233,15 +230,13 @@ bool MemInfo::process_full_gc() {
VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
"[MemoryGC] before free top memory load in full GC",
MemTrackerLimiter::Type::LOAD);
RuntimeProfile* tml_profile = profile->create_child("FreeTopMemoryLoad",
true, true);
- freed_mem += MemTrackerLimiter::free_top_memory_load(
- _s_process_full_gc_size - freed_mem, pre_vm_rss,
pre_sys_mem_available, tml_profile);
- if (freed_mem > _s_process_full_gc_size) {
- return true;
- }
- return false;
+ freed_mem +=
+
MemTrackerLimiter::free_top_memory_load(MemInfo::process_full_gc_size() -
freed_mem,
+ pre_vm_rss,
pre_sys_mem_available, tml_profile);
+ return freed_mem > MemInfo::process_full_gc_size();
}
-int64_t MemInfo::tg_not_enable_overcommit_group_gc() {
+int64_t MemInfo::tg_disable_overcommit_group_gc() {
MonotonicStopWatch watch;
watch.start();
std::vector<WorkloadGroupPtr> task_groups;
diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h
index f6c9407d73e..5606ebd45d6 100644
--- a/be/src/util/mem_info.h
+++ b/be/src/util/mem_info.h
@@ -92,6 +92,12 @@ public:
static inline int64_t sys_mem_available_warning_water_mark() {
return _s_sys_mem_available_warning_water_mark;
}
+ static inline int64_t process_minor_gc_size() {
+ return _s_process_minor_gc_size.load(std::memory_order_relaxed);
+ }
+ static inline int64_t process_full_gc_size() {
+ return _s_process_full_gc_size.load(std::memory_order_relaxed);
+ }
static inline int64_t get_tc_metrics(const std::string& name) {
#ifndef USE_JEMALLOC
@@ -152,30 +158,11 @@ public:
return _s_allocator_cache_mem.load(std::memory_order_relaxed);
}
static inline std::string allocator_cache_mem_str() { return
_s_allocator_cache_mem_str; }
- static inline int64_t proc_mem_no_allocator_cache() {
- return _s_proc_mem_no_allocator_cache.load(std::memory_order_relaxed) +
- refresh_interval_memory_growth;
- }
// Tcmalloc property `generic.total_physical_bytes` records the total
length of the virtual memory
// obtained by the process malloc, not the physical memory actually used
by the process in the OS.
static void refresh_allocator_mem();
- /** jemalloc pdirty is number of pages within unused extents that are
potentially
- * dirty, and for which madvise() or similar has not been called.
- *
- * So they will be subtracted from RSS to make accounting more
- * accurate, since those pages are not really RSS but a memory
- * that can be used at anytime via jemalloc.
- */
- static inline void refresh_proc_mem_no_allocator_cache() {
- _s_proc_mem_no_allocator_cache.store(
- PerfCounters::get_vm_rss() -
static_cast<int64_t>(_s_allocator_cache_mem.load(
-
std::memory_order_relaxed)),
- std::memory_order_relaxed);
- refresh_interval_memory_growth = 0;
- }
-
static inline int64_t mem_limit() {
DCHECK(_s_initialized);
return _s_mem_limit.load(std::memory_order_relaxed);
@@ -193,17 +180,13 @@ public:
return
PrettyPrinter::print(_s_soft_mem_limit.load(std::memory_order_relaxed),
TUnit::BYTES);
}
- static bool is_exceed_soft_mem_limit(int64_t bytes = 0) {
- return proc_mem_no_allocator_cache() + bytes >= soft_mem_limit() ||
- sys_mem_available() < sys_mem_available_warning_water_mark();
- }
static std::string debug_string();
static bool process_minor_gc();
static bool process_full_gc();
- static int64_t tg_not_enable_overcommit_group_gc();
+ static int64_t tg_disable_overcommit_group_gc();
static int64_t tg_enable_overcommit_group_gc(int64_t request_free_memory,
RuntimeProfile* profile, bool
is_minor_gc);
@@ -220,7 +203,6 @@ private:
static std::atomic<int64_t> _s_allocator_cache_mem;
static std::string _s_allocator_cache_mem_str;
static std::atomic<int64_t> _s_virtual_memory_used;
- static std::atomic<int64_t> _s_proc_mem_no_allocator_cache;
static std::atomic<int64_t> _s_sys_mem_available;
static int64_t _s_sys_mem_available_low_water_mark;
diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp
index a72165f936c..f6758a2dbb9 100644
--- a/be/src/vec/common/allocator.cpp
+++ b/be/src/vec/common/allocator.cpp
@@ -28,6 +28,7 @@
// Allocator is used by too many files. For compilation speed, put
dependencies in `.cpp` as much as possible.
#include "runtime/fragment_mgr.h"
+#include "runtime/memory/global_memory_arbitrator.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/memory/thread_mem_tracker_mgr.h"
#include "runtime/thread_context.h"
@@ -46,7 +47,7 @@ void Allocator<clear_memory_, mmap_populate,
use_mmap>::sys_memory_check(size_t
if (doris::thread_context()->skip_memory_check != 0) {
return;
}
- if (doris::MemTrackerLimiter::sys_mem_exceed_limit_check(size)) {
+ if (doris::GlobalMemoryArbitrator::is_exceed_hard_mem_limit(size)) {
// Only thread attach query, and has not completely waited for
thread_wait_gc_max_milliseconds,
// will wait for gc, asynchronous cancel or throw bad::alloc.
// Otherwise, if the external catch, directly throw bad::alloc.
@@ -58,9 +59,9 @@ void Allocator<clear_memory_, mmap_populate,
use_mmap>::sys_memory_check(size_t
doris::thread_context()->thread_mem_tracker()->peak_consumption(),
doris::thread_context()->thread_mem_tracker()->consumption(),
doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(),
- doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str());
+
doris::GlobalMemoryArbitrator::process_limit_exceeded_errmsg_str());
- if (size > 1024l * 1024 * 1024 &&
!doris::enable_thread_catch_bad_alloc &&
+ if (size > 1024L * 1024 * 1024 &&
!doris::enable_thread_catch_bad_alloc &&
!doris::config::disable_memory_gc) { // 1G
err_msg += "\nAlloc Stacktrace:\n" + doris::get_stack_trace();
}
@@ -83,7 +84,7 @@ void Allocator<clear_memory_, mmap_populate,
use_mmap>::sys_memory_check(size_t
doris::config::thread_wait_gc_max_milliseconds, err_msg);
while (wait_milliseconds <
doris::config::thread_wait_gc_max_milliseconds) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
- if
(!doris::MemTrackerLimiter::sys_mem_exceed_limit_check(size)) {
+ if
(!doris::GlobalMemoryArbitrator::is_exceed_hard_mem_limit(size)) {
doris::MemInfo::refresh_interval_memory_growth += size;
break;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]