This is an automated email from the ASF dual-hosted git repository.
zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4c74406734a [opt](memory) Support Memory Profile (#41310)
4c74406734a is described below
commit 4c74406734aba4f9607433bedfcfe136420c3cc0
Author: Xinyi Zou <[email protected]>
AuthorDate: Sat Oct 12 13:25:42 2024 +0800
[opt](memory) Support Memory Profile (#41310)
1. Support Memory Tracker to create Memory Profile snapshots.
2. Remove BE `mem_tracker` web page and replace it with more intuitive
Memory Profile.
3. All memory logs also print more intuitive Memory Profile.
---
be/src/common/config.cpp | 5 +-
be/src/common/config.h | 5 +-
be/src/common/daemon.cpp | 30 +-
be/src/http/default_path_handlers.cpp | 155 ++++-----
be/src/olap/rowset/beta_rowset_writer.cpp | 2 +-
be/src/olap/rowset/segcompaction.cpp | 12 +-
be/src/olap/rowset/segcompaction.h | 2 +-
be/src/olap/task/engine_publish_version_task.cpp | 13 +-
be/src/runtime/exec_env.h | 4 +
be/src/runtime/exec_env_init.cpp | 4 +
be/src/runtime/memory/cache_policy.h | 2 +-
be/src/runtime/memory/global_memory_arbitrator.cpp | 13 +-
be/src/runtime/memory/global_memory_arbitrator.h | 22 +-
be/src/runtime/memory/mem_tracker_limiter.cpp | 300 ++++-------------
be/src/runtime/memory/mem_tracker_limiter.h | 127 +++-----
be/src/runtime/memory/memory_profile.cpp | 353 +++++++++++++++++++++
be/src/runtime/memory/memory_profile.h | 82 +++++
be/src/runtime/memory/memory_reclamation.cpp | 46 ++-
be/src/runtime/memory/thread_mem_tracker_mgr.h | 2 +-
be/src/runtime/process_profile.cpp | 44 +++
be/src/runtime/process_profile.h | 62 ++++
.../routine_load/routine_load_task_executor.cpp | 4 +-
be/src/util/mem_info.cpp | 19 +-
be/src/util/mem_info.h | 18 +-
be/src/util/runtime_profile.cpp | 6 +-
be/src/util/runtime_profile.h | 4 +-
be/src/vec/common/allocator.cpp | 9 +-
be/test/runtime/memory/mem_tracker_test.cpp | 2 +-
be/test/testutil/run_all_tests.cpp | 2 +
29 files changed, 865 insertions(+), 484 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index e805da4e62a..5527ab07885 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -150,7 +150,10 @@ DEFINE_mInt64(stacktrace_in_alloc_large_memory_bytes,
"2147483648");
DEFINE_mInt64(crash_in_alloc_large_memory_bytes, "-1");
-// If memory tracker value is inaccurate, BE will crash. usually used in test
environments, default value is false.
+// The actual meaning of this parameter is `debug_memory`.
+// 1. crash in memory tracker inaccurate, if memory tracker value is
inaccurate, BE will crash.
+// usually used in test environments, default value is false.
+// 2. print more memory logs.
DEFINE_mBool(crash_in_memory_tracker_inaccurate, "false");
// default is true. if any memory tracking in Orphan mem tracker will report
error.
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 94435bf83fc..e2789913703 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -200,7 +200,10 @@ DECLARE_mInt64(stacktrace_in_alloc_large_memory_bytes);
// modify this parameter to crash when large memory allocation occur will help
DECLARE_mInt64(crash_in_alloc_large_memory_bytes);
-// If memory tracker value is inaccurate, BE will crash. usually used in test
environments, default value is false.
+// The actual meaning of this parameter is `debug_memory`.
+// 1. crash in memory tracker inaccurate, if memory tracker value is
inaccurate, BE will crash.
+// usually used in test environments, default value is false.
+// 2. print more memory logs.
DECLARE_mBool(crash_in_memory_tracker_inaccurate);
// default is true. if any memory tracking in Orphan mem tracker will report
error.
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 27fbfb71d7f..ce2a6878dba 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -27,17 +27,13 @@
// IWYU pragma: no_include <bits/std_abs.h>
#include <butil/iobuf.h>
#include <math.h>
-#include <signal.h>
#include <stdint.h>
#include <stdlib.h>
-#include <string.h>
-#include <algorithm>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <map>
#include <ostream>
-#include <set>
#include <string>
#include "cloud/config.h"
@@ -45,30 +41,23 @@
#include "common/logging.h"
#include "common/status.h"
#include "olap/memtable_memory_limiter.h"
-#include "olap/options.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "runtime/be_proc_monitor.h"
-#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/memory/global_memory_arbitrator.h"
-#include "runtime/memory/mem_tracker.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/memory/memory_reclamation.h"
+#include "runtime/process_profile.h"
#include "runtime/runtime_query_statistics_mgr.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "util/algorithm_util.h"
-#include "util/cpu_info.h"
-#include "util/debug_util.h"
-#include "util/disk_info.h"
#include "util/doris_metrics.h"
#include "util/mem_info.h"
#include "util/metrics.h"
-#include "util/network_util.h"
#include "util/perf_counters.h"
#include "util/system_metrics.h"
-#include "util/thrift_util.h"
#include "util/time.h"
namespace doris {
@@ -233,9 +222,8 @@ void refresh_memory_state_after_memory_change() {
if (abs(last_print_proc_mem - PerfCounters::get_vm_rss()) > 268435456) {
last_print_proc_mem = PerfCounters::get_vm_rss();
doris::MemTrackerLimiter::clean_tracker_limiter_group();
- doris::MemTrackerLimiter::enable_print_log_process_usage();
- // Refresh mem tracker each type counter.
- doris::MemTrackerLimiter::refresh_global_counter();
+
doris::ProcessProfile::instance()->memory_profile()->enable_print_log_process_usage();
+
doris::ProcessProfile::instance()->memory_profile()->refresh_memory_overview_profile();
LOG(INFO) << doris::GlobalMemoryArbitrator::
process_mem_log_str(); // print mem log when memory
state by 256M
}
@@ -339,10 +327,12 @@ void Daemon::memory_gc_thread() {
memory_full_gc_sleep_time_ms = memory_gc_sleep_time_ms;
memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms;
LOG(INFO) << fmt::format("[MemoryGC] start full GC, {}.",
mem_info);
- doris::MemTrackerLimiter::print_log_process_usage();
+
doris::ProcessProfile::instance()->memory_profile()->print_log_process_usage();
if
(doris::MemoryReclamation::process_full_gc(std::move(mem_info))) {
// If there is not enough memory to be gc, the process memory
usage will not be printed in the next continuous gc.
- doris::MemTrackerLimiter::enable_print_log_process_usage();
+ doris::ProcessProfile::instance()
+ ->memory_profile()
+ ->enable_print_log_process_usage();
}
} else if (memory_minor_gc_sleep_time_ms <= 0 &&
(sys_mem_available <
doris::MemInfo::sys_mem_available_warning_water_mark() ||
@@ -352,9 +342,11 @@ void Daemon::memory_gc_thread() {
doris::GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str();
memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms;
LOG(INFO) << fmt::format("[MemoryGC] start minor GC, {}.",
mem_info);
- doris::MemTrackerLimiter::print_log_process_usage();
+
doris::ProcessProfile::instance()->memory_profile()->print_log_process_usage();
if
(doris::MemoryReclamation::process_minor_gc(std::move(mem_info))) {
- doris::MemTrackerLimiter::enable_print_log_process_usage();
+ doris::ProcessProfile::instance()
+ ->memory_profile()
+ ->enable_print_log_process_usage();
}
} else {
if (memory_full_gc_sleep_time_ms > 0) {
diff --git a/be/src/http/default_path_handlers.cpp
b/be/src/http/default_path_handlers.cpp
index 2ece1e3fdcd..e018fb04f06 100644
--- a/be/src/http/default_path_handlers.cpp
+++ b/be/src/http/default_path_handlers.cpp
@@ -38,13 +38,9 @@
#include <vector>
#include "common/config.h"
-#include "gutil/strings/numbers.h"
-#include "gutil/strings/substitute.h"
#include "http/action/tablets_info_action.h"
#include "http/web_page_handler.h"
-#include "runtime/memory/global_memory_arbitrator.h"
-#include "runtime/memory/mem_tracker.h"
-#include "runtime/memory/mem_tracker_limiter.h"
+#include "runtime/process_profile.h"
#include "util/easy_json.h"
#include "util/mem_info.h"
#include "util/perf_counters.h"
@@ -97,16 +93,51 @@ void config_handler(const WebPageHandler::ArgumentMap&
args, std::stringstream*
(*output) << "</pre>";
}
-// Registered to handle "/memz", and prints out memory allocation statistics.
-void mem_usage_handler(const WebPageHandler::ArgumentMap& args,
std::stringstream* output) {
- (*output) << "<pre>"
- << "Mem Limit: " << PrettyPrinter::print(MemInfo::mem_limit(),
TUnit::BYTES)
+void memory_info_handler(std::stringstream* output) {
+ (*output) << "<h2>Memory Info</h2>\n";
+ (*output) << "<pre>";
+ (*output) << "<h4 id=\"memoryDocumentsTitle\">Memory Documents</h4>\n"
+ << "<a "
+
"href=https://doris.apache.org/zh-CN/docs/dev/admin-manual/memory-management/"
+ "overview>Memory Management Overview</a>\n"
+ << "<a "
+
"href=https://doris.apache.org/zh-CN/docs/dev/admin-manual/memory-management/"
+ "memory-issue-faq>Memory Issue FAQ</a>\n"
+ << "\n---\n";
+
+ (*output) << "<h4 id=\"memoryPropertiesTitle\">Memory Properties</h4>\n"
+ << "System Physical Mem: "
+ << PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES)
<< std::endl
+ << "System Page Size: " << MemInfo::get_page_size() << std::endl
+ << "Mem Limit: " << MemInfo::mem_limit_str() << std::endl
+ << "Soft Mem Limit: " << MemInfo::soft_mem_limit_str() <<
std::endl
+ << "System Mem Available Low Water Mark: "
+ <<
PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES)
+ << std::endl
+ << "System Mem Available Warning Water Mark: "
+ <<
PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(),
TUnit::BYTES)
<< std::endl
- << "Physical Mem From Perf: "
- << PrettyPrinter::print(PerfCounters::get_vm_rss(),
TUnit::BYTES) << std::endl
- << "</pre>";
+ << "Cgroup Mem Limit: "
+ << PrettyPrinter::print(MemInfo::cgroup_mem_limit(),
TUnit::BYTES) << std::endl
+ << "Cgroup Mem Usage: "
+ << PrettyPrinter::print(MemInfo::cgroup_mem_usage(),
TUnit::BYTES) << std::endl
+ << "Cgroup Mem Refresh State: " <<
MemInfo::cgroup_mem_refresh_state() << std::endl
+ << "\n---\n";
+
+ (*output) << "<h4 id=\"memoryOptionSettingsTitle\">Memory Option
Settings</h4>\n";
+ {
+ std::lock_guard<std::mutex>
lock(*config::get_mutable_string_config_lock());
+ for (const auto& it : *(config::full_conf_map)) {
+ if (it.first.find("memory") != std::string::npos ||
+ it.first.find("cache") != std::string::npos ||
+ it.first.find("mem") != std::string::npos) {
+ (*output) << it.first << "=" << it.second << std::endl;
+ }
+ }
+ }
+ (*output) << "\n---\n";
- (*output) << "<pre>";
+ (*output) << "<h4 id=\"jemallocProfilesTitle\">Jemalloc Profiles</h4>\n";
#if defined(ADDRESS_SANITIZER) || defined(LEAK_SANITIZER) ||
defined(THREAD_SANITIZER)
(*output) << "Memory tracking is not available with address sanitizer
builds.";
#elif defined(USE_JEMALLOC)
@@ -117,15 +148,25 @@ void mem_usage_handler(const WebPageHandler::ArgumentMap&
args, std::stringstrea
};
jemalloc_stats_print(write_cb, &tmp, "a");
boost::replace_all(tmp, "\n", "<br>");
- (*output) << tmp << "</pre>";
+ (*output) << tmp;
#else
char buf[2048];
MallocExtension::instance()->GetStats(buf, 2048);
// Replace new lines with <br> for html
std::string tmp(buf);
boost::replace_all(tmp, "\n", "<br>");
- (*output) << tmp << "</pre>";
+ (*output) << tmp;
#endif
+ (*output) << "</pre>";
+}
+
+// Registered to handle "/profile".
+void process_profile_handler(const WebPageHandler::ArgumentMap& args,
std::stringstream* output) {
+ (*output) << "<h2 id=\"processProfileTitle\">Process Profile</h2>\n";
+ doris::ProcessProfile::instance()->refresh_profile();
+ (*output) << "<pre id=\"processProfile\">"
+ <<
doris::ProcessProfile::instance()->print_process_profile_no_root() << "</pre>";
+ memory_info_handler(output);
}
void display_tablets_callback(const WebPageHandler::ArgumentMap& args,
EasyJson* ej) {
@@ -141,76 +182,8 @@ void display_tablets_callback(const
WebPageHandler::ArgumentMap& args, EasyJson*
// Registered to handle "/mem_tracker", and prints out memory tracker
information.
void mem_tracker_handler(const WebPageHandler::ArgumentMap& args,
std::stringstream* output) {
- (*output) << "<h1>Memory usage by subsystem</h1>\n";
- std::vector<MemTrackerLimiter::Snapshot> snapshots;
- auto iter = args.find("type");
- if (iter != args.end()) {
- if (iter->second == "global") {
- MemTrackerLimiter::make_type_snapshots(&snapshots,
MemTrackerLimiter::Type::GLOBAL);
- } else if (iter->second == "query") {
- MemTrackerLimiter::make_type_snapshots(&snapshots,
MemTrackerLimiter::Type::QUERY);
- } else if (iter->second == "load") {
- MemTrackerLimiter::make_type_snapshots(&snapshots,
MemTrackerLimiter::Type::LOAD);
- } else if (iter->second == "compaction") {
- MemTrackerLimiter::make_type_snapshots(&snapshots,
MemTrackerLimiter::Type::COMPACTION);
- } else if (iter->second == "schema_change") {
- MemTrackerLimiter::make_type_snapshots(&snapshots,
-
MemTrackerLimiter::Type::SCHEMA_CHANGE);
- } else if (iter->second == "other") {
- MemTrackerLimiter::make_type_snapshots(&snapshots,
MemTrackerLimiter::Type::OTHER);
- } else if (iter->second == "reserved_memory") {
-
MemTrackerLimiter::make_all_reserved_trackers_snapshots(&snapshots);
- } else if (iter->second == "all") {
- MemTrackerLimiter::make_all_memory_state_snapshots(&snapshots);
- }
- } else {
- (*output) << "<h4>*Notice:</h4>\n";
- (*output) << "<h4> 1. MemTracker only counts the memory on part of
the main execution "
- "path, "
- "which is usually less than the real process
memory.</h4>\n";
- (*output) << "<h4> 2. each `type` is the sum of a set of tracker
values, "
- "`sum of all trackers` is the sum of all trackers of all
types, .</h4>\n";
- (*output) << "<h4> 3. `process resident memory` is the physical
memory of the process, "
- "from /proc VmRSS VmHWM.</h4>\n";
- (*output) << "<h4> 4. `process virtual memory` is the virtual
memory of the process, "
- "from /proc VmSize VmPeak.</h4>\n";
- (*output) << "<h4> 5.`/mem_tracker?type=<type name>` to view the
memory details of each "
- "type, for example, `/mem_tracker?type=query` will list
the memory of all "
- "queries; "
- "`/mem_tracker?type=global` will list the memory of all
Cache, metadata and "
- "other "
- "global life cycles.</h4>\n";
- (*output) << "<h4>see documentation for details.";
- MemTrackerLimiter::make_process_snapshots(&snapshots);
- }
-
- (*output) << "<table data-toggle='table' "
- " data-pagination='true' "
- " data-search='true' "
- " class='table table-striped'>\n";
- (*output) << "<thead><tr>"
- "<th data-sortable='true'>Type</th>"
- "<th data-sortable='true'>Label</th>"
- "<th>Limit</th>"
- "<th data-sortable='true' "
- ">Current Consumption(Bytes)</th>"
- "<th>Current Consumption(Normalize)</th>"
- "<th data-sortable='true' "
- ">Peak Consumption(Bytes)</th>"
- "<th>Peak Consumption(Normalize)</th>"
- "</tr></thead>";
- (*output) << "<tbody>\n";
- for (const auto& item : snapshots) {
- string limit_str = item.limit == -1 ? "none" :
AccurateItoaKMGT(item.limit);
- string current_consumption_normalize =
AccurateItoaKMGT(item.cur_consumption);
- string peak_consumption_normalize =
AccurateItoaKMGT(item.peak_consumption);
- (*output) << strings::Substitute(
-
"<tr><td>$0</td><td>$1</td><td>$2</td><td>$3</td><td>$4</td><td>$5</td><td>$6</"
- "td></tr>\n",
- item.type, item.label, limit_str, item.cur_consumption,
- current_consumption_normalize, item.peak_consumption,
peak_consumption_normalize);
- }
- (*output) << "</tbody></table>\n";
+ (*output) << "<h2>mem_tracker webpage has been offline, please click <a "
+ "href=../profile>Process Profile</a>, see MemoryProfile and
Memory Info</h2>\n";
}
void heap_handler(const WebPageHandler::ArgumentMap& args, std::stringstream*
output) {
@@ -394,14 +367,10 @@ void add_default_path_handlers(WebPageHandler*
web_page_handler) {
web_page_handler->register_page("/varz", "Configs", config_handler,
true /* is_on_nav_bar */);
}
- web_page_handler->register_page("/memz", "Memory", mem_usage_handler, true
/* is_on_nav_bar */);
- web_page_handler->register_page(
- "/mem_tracker", "MemTracker",
- [](auto&& PH1, auto&& PH2) {
- return mem_tracker_handler(std::forward<decltype(PH1)>(PH1),
- std::forward<decltype(PH2)>(PH2));
- },
- true /* is_on_nav_bar */);
+ web_page_handler->register_page("/profile", "Process Profile",
process_profile_handler,
+ true /* is_on_nav_bar */);
+ web_page_handler->register_page("/mem_tracker", "MemTracker",
mem_tracker_handler,
+ true /* is_on_nav_bar */);
web_page_handler->register_page("/heap", "Heap Profile", heap_handler,
true /* is_on_nav_bar */);
web_page_handler->register_page("/cpu", "CPU Profile", cpu_handler, true
/* is_on_nav_bar */);
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index d5d3458dc3e..5d1b80f8cd7 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -291,7 +291,7 @@ Status
BaseBetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
Status BetaRowsetWriter::init(const RowsetWriterContext&
rowset_writer_context) {
RETURN_IF_ERROR(BaseBetaRowsetWriter::init(rowset_writer_context));
if (_segcompaction_worker) {
- _segcompaction_worker->init_mem_tracker(rowset_writer_context.txn_id);
+ _segcompaction_worker->init_mem_tracker(rowset_writer_context);
}
return Status::OK();
}
diff --git a/be/src/olap/rowset/segcompaction.cpp
b/be/src/olap/rowset/segcompaction.cpp
index d6bdb9387e9..e5d043d8a22 100644
--- a/be/src/olap/rowset/segcompaction.cpp
+++ b/be/src/olap/rowset/segcompaction.cpp
@@ -69,9 +69,17 @@ using namespace ErrorCode;
SegcompactionWorker::SegcompactionWorker(BetaRowsetWriter* writer) :
_writer(writer) {}
-void SegcompactionWorker::init_mem_tracker(int64_t txn_id) {
+void SegcompactionWorker::init_mem_tracker(const RowsetWriterContext&
rowset_writer_context) {
_seg_compact_mem_tracker = MemTrackerLimiter::create_shared(
- MemTrackerLimiter::Type::COMPACTION, "segcompaction-" +
std::to_string(txn_id));
+ MemTrackerLimiter::Type::COMPACTION,
+
fmt::format("segcompaction-txnID_{}-loadID_{}-tabletID_{}-indexID_{}-"
+ "partitionID_{}-version_{}",
+ std::to_string(rowset_writer_context.txn_id),
+ print_id(rowset_writer_context.load_id),
+ std::to_string(rowset_writer_context.tablet_id),
+ std::to_string(rowset_writer_context.index_id),
+ std::to_string(rowset_writer_context.partition_id),
+ rowset_writer_context.version.to_string()));
}
Status SegcompactionWorker::_get_segcompaction_reader(
diff --git a/be/src/olap/rowset/segcompaction.h
b/be/src/olap/rowset/segcompaction.h
index d498a5b8e33..54c5c3758c2 100644
--- a/be/src/olap/rowset/segcompaction.h
+++ b/be/src/olap/rowset/segcompaction.h
@@ -73,7 +73,7 @@ public:
// set the cancel flag, tasks already started will not be cancelled.
bool cancel();
- void init_mem_tracker(int64_t txn_id);
+ void init_mem_tracker(const RowsetWriterContext& rowset_writer_context);
private:
Status _create_segment_writer_for_segcompaction(
diff --git a/be/src/olap/task/engine_publish_version_task.cpp
b/be/src/olap/task/engine_publish_version_task.cpp
index dae4c6be814..75e589f3b97 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -82,8 +82,10 @@ EnginePublishVersionTask::EnginePublishVersionTask(
_succ_tablets(succ_tablets),
_discontinuous_version_tablets(discontinuous_version_tablets),
_table_id_to_tablet_id_to_num_delta_rows(table_id_to_tablet_id_to_num_delta_rows)
{
- _mem_tracker =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER,
- "TabletPublishTxnTask");
+ _mem_tracker = MemTrackerLimiter::create_shared(
+ MemTrackerLimiter::Type::OTHER,
+ fmt::format("EnginePublishVersionTask-transactionID_{}",
+ std::to_string(_publish_version_req.transaction_id)));
}
void EnginePublishVersionTask::add_error_tablet_id(int64_t tablet_id) {
@@ -381,8 +383,11 @@ TabletPublishTxnTask::TabletPublishTxnTask(StorageEngine&
engine,
_transaction_id(transaction_id),
_version(version),
_tablet_info(tablet_info),
-
_mem_tracker(MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER,
-
"TabletPublishTxnTask")) {
+ _mem_tracker(MemTrackerLimiter::create_shared(
+ MemTrackerLimiter::Type::OTHER,
+
fmt::format("TabletPublishTxnTask-partitionID_{}-transactionID_{}-version_{}",
+ std::to_string(partition_id),
std::to_string(transaction_id),
+ version.to_string()))) {
_stats.submit_time_us = MonotonicMicros();
}
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 61cebad10b9..399c2a7ce05 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -109,6 +109,7 @@ class LookupConnectionCache;
class RowCache;
class DummyLRUCache;
class CacheManager;
+class ProcessProfile;
class WalManager;
class DNSCache;
@@ -271,6 +272,7 @@ public:
void set_storage_engine(std::unique_ptr<BaseStorageEngine>&& engine);
void set_cache_manager(CacheManager* cm) { this->_cache_manager = cm; }
+ void set_process_profile(ProcessProfile* pp) { this->_process_profile =
pp; }
void set_tablet_schema_cache(TabletSchemaCache* c) {
this->_tablet_schema_cache = c; }
void set_storage_page_cache(StoragePageCache* c) {
this->_storage_page_cache = c; }
void set_segment_loader(SegmentLoader* sl) { this->_segment_loader = sl; }
@@ -303,6 +305,7 @@ public:
LookupConnectionCache* get_lookup_connection_cache() { return
_lookup_connection_cache; }
RowCache* get_row_cache() { return _row_cache; }
CacheManager* get_cache_manager() { return _cache_manager; }
+ ProcessProfile* get_process_profile() { return _process_profile; }
segment_v2::InvertedIndexSearcherCache*
get_inverted_index_searcher_cache() {
return _inverted_index_searcher_cache;
}
@@ -441,6 +444,7 @@ private:
LookupConnectionCache* _lookup_connection_cache = nullptr;
RowCache* _row_cache = nullptr;
CacheManager* _cache_manager = nullptr;
+ ProcessProfile* _process_profile = nullptr;
segment_v2::InvertedIndexSearcherCache* _inverted_index_searcher_cache =
nullptr;
segment_v2::InvertedIndexQueryCache* _inverted_index_query_cache = nullptr;
QueryCache* _query_cache = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index adb6b7fd101..d9eedc6d8c5 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -74,6 +74,7 @@
#include "runtime/memory/mem_tracker.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/memory/thread_mem_tracker_mgr.h"
+#include "runtime/process_profile.h"
#include "runtime/result_buffer_mgr.h"
#include "runtime/result_queue_mgr.h"
#include "runtime/routine_load/routine_load_task_executor.h"
@@ -450,6 +451,7 @@ Status ExecEnv::_init_mem_env() {
bool is_percent = false;
std::stringstream ss;
// 1. init mem tracker
+ _process_profile = ProcessProfile::create_global_instance();
init_mem_tracker();
thread_context()->thread_mem_tracker_mgr->init();
#if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) &&
!defined(ADDRESS_SANITIZER) && \
@@ -772,6 +774,8 @@ void ExecEnv::destroy() {
// dns cache is a global instance and need to be released at last
SAFE_DELETE(_dns_cache);
+ SAFE_DELETE(_process_profile);
+
_s_tracking_memory = false;
LOG(INFO) << "Doris exec envorinment is destoried.";
diff --git a/be/src/runtime/memory/cache_policy.h
b/be/src/runtime/memory/cache_policy.h
index 5241efb9c29..666d32bdb56 100644
--- a/be/src/runtime/memory/cache_policy.h
+++ b/be/src/runtime/memory/cache_policy.h
@@ -92,7 +92,7 @@ public:
case CacheType::FOR_UT_CACHE_NUMBER:
return "ForUTCacheNumber";
case CacheType::QUERY_CACHE:
- return "QUERY_CACHE";
+ return "QueryCache";
default:
LOG(FATAL) << "not match type of cache policy :" <<
static_cast<int>(type);
}
diff --git a/be/src/runtime/memory/global_memory_arbitrator.cpp
b/be/src/runtime/memory/global_memory_arbitrator.cpp
index 45d7781786f..0458dd72a33 100644
--- a/be/src/runtime/memory/global_memory_arbitrator.cpp
+++ b/be/src/runtime/memory/global_memory_arbitrator.cpp
@@ -19,6 +19,7 @@
#include <bvar/bvar.h>
+#include "runtime/process_profile.h"
#include "runtime/thread_context.h"
namespace doris {
@@ -33,7 +34,7 @@ bvar::PassiveStatus<int64_t> g_sys_mem_avail(
"meminfo_sys_mem_avail", [](void*) { return
GlobalMemoryArbitrator::sys_mem_available(); },
nullptr);
-std::atomic<int64_t> GlobalMemoryArbitrator::_s_process_reserved_memory = 0;
+std::atomic<int64_t> GlobalMemoryArbitrator::_process_reserved_memory = 0;
std::atomic<int64_t> GlobalMemoryArbitrator::refresh_interval_memory_growth =
0;
std::mutex GlobalMemoryArbitrator::cache_adjust_capacity_lock;
std::condition_variable GlobalMemoryArbitrator::cache_adjust_capacity_cv;
@@ -45,9 +46,10 @@ std::atomic<bool>
GlobalMemoryArbitrator::memtable_memory_refresh_notify {false}
bool GlobalMemoryArbitrator::try_reserve_process_memory(int64_t bytes) {
if (sys_mem_available() - bytes <
MemInfo::sys_mem_available_warning_water_mark()) {
+
doris::ProcessProfile::instance()->memory_profile()->print_log_process_usage();
return false;
}
- int64_t old_reserved_mem =
_s_process_reserved_memory.load(std::memory_order_relaxed);
+ int64_t old_reserved_mem =
_process_reserved_memory.load(std::memory_order_relaxed);
int64_t new_reserved_mem = 0;
do {
new_reserved_mem = old_reserved_mem + bytes;
@@ -55,15 +57,16 @@ bool
GlobalMemoryArbitrator::try_reserve_process_memory(int64_t bytes) {
refresh_interval_memory_growth.load(std::memory_order_relaxed) +
new_reserved_mem >=
MemInfo::soft_mem_limit())) {
+
doris::ProcessProfile::instance()->memory_profile()->print_log_process_usage();
return false;
}
- } while
(!_s_process_reserved_memory.compare_exchange_weak(old_reserved_mem,
new_reserved_mem,
-
std::memory_order_relaxed));
+ } while (!_process_reserved_memory.compare_exchange_weak(old_reserved_mem,
new_reserved_mem,
+
std::memory_order_relaxed));
return true;
}
void GlobalMemoryArbitrator::release_process_reserved_memory(int64_t bytes) {
- _s_process_reserved_memory.fetch_sub(bytes, std::memory_order_relaxed);
+ _process_reserved_memory.fetch_sub(bytes, std::memory_order_relaxed);
}
int64_t GlobalMemoryArbitrator::sub_thread_reserve_memory(int64_t bytes) {
diff --git a/be/src/runtime/memory/global_memory_arbitrator.h
b/be/src/runtime/memory/global_memory_arbitrator.h
index 1859f45391f..075113088fb 100644
--- a/be/src/runtime/memory/global_memory_arbitrator.h
+++ b/be/src/runtime/memory/global_memory_arbitrator.h
@@ -17,7 +17,7 @@
#pragma once
-#include "runtime/memory/mem_tracker_limiter.h"
+#include "runtime/process_profile.h"
#include "util/mem_info.h"
namespace doris {
@@ -107,7 +107,7 @@ public:
static void release_process_reserved_memory(int64_t bytes);
static inline int64_t process_reserved_memory() {
- return _s_process_reserved_memory.load(std::memory_order_relaxed);
+ return _process_reserved_memory.load(std::memory_order_relaxed);
}
// `process_memory_usage` includes all reserved memory. if a thread has
`reserved_memory`,
@@ -122,8 +122,12 @@ public:
if (bytes <= 0) {
return false;
}
- return process_memory_usage() + bytes >= MemInfo::soft_mem_limit() ||
- sys_mem_available() - bytes <
MemInfo::sys_mem_available_warning_water_mark();
+ auto rt = process_memory_usage() + bytes >= MemInfo::soft_mem_limit()
||
+ sys_mem_available() - bytes <
MemInfo::sys_mem_available_warning_water_mark();
+ if (rt) {
+
doris::ProcessProfile::instance()->memory_profile()->print_log_process_usage();
+ }
+ return rt;
}
static bool is_exceed_hard_mem_limit(int64_t bytes = 0) {
@@ -139,8 +143,12 @@ public:
// tcmalloc/jemalloc allocator cache does not participate in the mem
check as part of the process physical memory.
// because `new/malloc` will trigger mem hook when using
tcmalloc/jemalloc allocator cache,
// but it may not actually alloc physical memory, which is not
expected in mem hook fail.
- return process_memory_usage() + bytes >= MemInfo::mem_limit() ||
- sys_mem_available() - bytes <
MemInfo::sys_mem_available_low_water_mark();
+ auto rt = process_memory_usage() + bytes >= MemInfo::mem_limit() ||
+ sys_mem_available() - bytes <
MemInfo::sys_mem_available_low_water_mark();
+ if (rt) {
+
doris::ProcessProfile::instance()->memory_profile()->print_log_process_usage();
+ }
+ return rt;
}
static std::string process_mem_log_str() {
@@ -192,7 +200,7 @@ public:
}
private:
- static std::atomic<int64_t> _s_process_reserved_memory;
+ static std::atomic<int64_t> _process_reserved_memory;
};
} // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 78e66b6a579..05ff13f0e7c 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -33,27 +33,15 @@
#include "runtime/workload_group/workload_group.h"
#include "service/backend_options.h"
#include "util/mem_info.h"
-#include "util/perf_counters.h"
#include "util/runtime_profile.h"
namespace doris {
static bvar::Adder<int64_t>
memory_memtrackerlimiter_cnt("memory_memtrackerlimiter_cnt");
-static bvar::Adder<int64_t>
memory_all_trackers_sum_bytes("memory_all_trackers_sum_bytes");
-static bvar::Adder<int64_t>
memory_global_trackers_sum_bytes("memory_global_trackers_sum_bytes");
-static bvar::Adder<int64_t>
memory_query_trackers_sum_bytes("memory_query_trackers_sum_bytes");
-static bvar::Adder<int64_t>
memory_load_trackers_sum_bytes("memory_load_trackers_sum_bytes");
-static bvar::Adder<int64_t> memory_compaction_trackers_sum_bytes(
- "memory_compaction_trackers_sum_bytes");
-static bvar::Adder<int64_t> memory_schema_change_trackers_sum_bytes(
- "memory_schema_change_trackers_sum_bytes");
-static bvar::Adder<int64_t>
memory_other_trackers_sum_bytes("memory_other_trackers_sum_bytes");
std::atomic<long> mem_tracker_limiter_group_counter(0);
constexpr auto GC_MAX_SEEK_TRACKER = 1000;
-std::atomic<bool> MemTrackerLimiter::_enable_print_log_process_usage {true};
-
// Reset before each free
static std::unique_ptr<RuntimeProfile> free_top_memory_task_profile {
std::make_unique<RuntimeProfile>("-")};
@@ -75,6 +63,7 @@ MemTrackerLimiter::MemTrackerLimiter(Type type, const
std::string& label, int64_
_type = type;
_label = label;
_limit = byte_limit;
+ _uid = UniqueId::gen_uid();
if (_type == Type::GLOBAL) {
_group_num = 0;
} else {
@@ -216,87 +205,38 @@ std::string MemTrackerLimiter::print_address_sanitizers()
{
return detail;
}
-MemTrackerLimiter::Snapshot MemTrackerLimiter::make_snapshot() const {
- Snapshot snapshot;
- snapshot.type = type_string(_type);
- snapshot.label = _label;
- snapshot.limit = _limit;
- snapshot.cur_consumption = consumption();
- snapshot.peak_consumption = peak_consumption();
- return snapshot;
-}
-
-MemTrackerLimiter::Snapshot
MemTrackerLimiter::make_reserved_trackers_snapshot() const {
- Snapshot snapshot;
- snapshot.type = "reserved_memory";
- snapshot.label = _label;
- snapshot.limit = -1;
- snapshot.cur_consumption = reserved_consumption();
- snapshot.peak_consumption = reserved_peak_consumption();
- return snapshot;
-}
-
-void
MemTrackerLimiter::make_all_reserved_trackers_snapshots(std::vector<Snapshot>*
snapshots) {
- for (auto& i : ExecEnv::GetInstance()->mem_tracker_limiter_pool) {
- std::lock_guard<std::mutex> l(i.group_lock);
- for (auto trackerWptr : i.trackers) {
- auto tracker = trackerWptr.lock();
- if (tracker != nullptr && tracker->reserved_consumption() != 0) {
-
(*snapshots).emplace_back(tracker->make_reserved_trackers_snapshot());
- }
- }
+RuntimeProfile* MemTrackerLimiter::make_profile(RuntimeProfile* profile) const
{
+ RuntimeProfile* profile_snapshot = profile->create_child(
+ fmt::format("{}@{}@id={}", _label, type_string(_type),
_uid.to_string()), true, false);
+ RuntimeProfile::Counter* current_usage_counter =
+ ADD_COUNTER(profile_snapshot, "CurrentUsage", TUnit::BYTES);
+ RuntimeProfile::Counter* peak_usage_counter =
+ ADD_COUNTER(profile_snapshot, "PeakUsage", TUnit::BYTES);
+ COUNTER_SET(current_usage_counter, consumption());
+ COUNTER_SET(peak_usage_counter, peak_consumption());
+ if (has_limit()) {
+ RuntimeProfile::Counter* limit_counter =
+ ADD_COUNTER(profile_snapshot, "Limit", TUnit::BYTES);
+ COUNTER_SET(limit_counter, _limit);
+ }
+ if (reserved_peak_consumption() != 0) {
+ RuntimeProfile::Counter* reserved_counter =
+ ADD_COUNTER(profile_snapshot, "ReservedMemory", TUnit::BYTES);
+ RuntimeProfile::Counter* reserved_peak_counter =
+ ADD_COUNTER(profile_snapshot, "ReservedPeakMemory",
TUnit::BYTES);
+ COUNTER_SET(reserved_counter, reserved_consumption());
+ COUNTER_SET(reserved_peak_counter, reserved_peak_consumption());
}
+ return profile_snapshot;
}
-void MemTrackerLimiter::refresh_global_counter() {
- std::unordered_map<Type, int64_t> type_mem_sum = {
- {Type::GLOBAL, 0}, {Type::QUERY, 0}, {Type::LOAD, 0},
- {Type::COMPACTION, 0}, {Type::SCHEMA_CHANGE, 0}, {Type::OTHER, 0}};
- // always ExecEnv::ready(), because Daemon::_stop_background_threads_latch
- for (auto& group : ExecEnv::GetInstance()->mem_tracker_limiter_pool) {
- std::lock_guard<std::mutex> l(group.group_lock);
- for (auto trackerWptr : group.trackers) {
- auto tracker = trackerWptr.lock();
- if (tracker != nullptr) {
- type_mem_sum[tracker->type()] += tracker->consumption();
- }
- }
- }
- int64_t all_trackers_mem_sum = 0;
- for (auto it : type_mem_sum) {
- MemTrackerLimiter::TypeMemSum[it.first].set(it.second);
-
- all_trackers_mem_sum += it.second;
- switch (it.first) {
- case Type::GLOBAL:
- memory_global_trackers_sum_bytes
- << it.second -
memory_global_trackers_sum_bytes.get_value();
- break;
- case Type::QUERY:
- memory_query_trackers_sum_bytes
- << it.second - memory_query_trackers_sum_bytes.get_value();
- break;
- case Type::LOAD:
- memory_load_trackers_sum_bytes
- << it.second - memory_load_trackers_sum_bytes.get_value();
- break;
- case Type::COMPACTION:
- memory_compaction_trackers_sum_bytes
- << it.second -
memory_compaction_trackers_sum_bytes.get_value();
- break;
- case Type::SCHEMA_CHANGE:
- memory_schema_change_trackers_sum_bytes
- << it.second -
memory_schema_change_trackers_sum_bytes.get_value();
- break;
- case Type::OTHER:
- memory_other_trackers_sum_bytes
- << it.second - memory_other_trackers_sum_bytes.get_value();
- }
- }
- all_trackers_mem_sum += MemInfo::allocator_cache_mem();
- all_trackers_mem_sum += MemInfo::allocator_metadata_mem();
- memory_all_trackers_sum_bytes << all_trackers_mem_sum -
-
memory_all_trackers_sum_bytes.get_value();
+std::string MemTrackerLimiter::make_profile_str() const {
+ std::unique_ptr<RuntimeProfile> profile_snapshot =
+ std::make_unique<RuntimeProfile>("MemTrackerSnapshot");
+ make_profile(profile_snapshot.get());
+ std::stringstream ss;
+ profile_snapshot->pretty_print(&ss);
+ return ss.str();
}
void MemTrackerLimiter::clean_tracker_limiter_group() {
@@ -317,78 +257,15 @@ void MemTrackerLimiter::clean_tracker_limiter_group() {
#endif
}
-void MemTrackerLimiter::make_process_snapshots(std::vector<Snapshot>*
snapshots) {
- MemTrackerLimiter::refresh_global_counter();
- int64_t all_trackers_mem_sum = 0;
- Snapshot snapshot;
- for (const auto& it : MemTrackerLimiter::TypeMemSum) {
- snapshot.type = "overview";
- snapshot.label = type_string(it.first);
- snapshot.limit = -1;
- snapshot.cur_consumption = it.second.current_value();
- snapshot.peak_consumption = it.second.peak_value();
- (*snapshots).emplace_back(snapshot);
- all_trackers_mem_sum += it.second.current_value();
- }
-
- snapshot.type = "overview";
- snapshot.label = "tc/jemalloc_cache";
- snapshot.limit = -1;
- snapshot.cur_consumption = MemInfo::allocator_cache_mem();
- snapshot.peak_consumption = -1;
- (*snapshots).emplace_back(snapshot);
- all_trackers_mem_sum += MemInfo::allocator_cache_mem();
-
- snapshot.type = "overview";
- snapshot.label = "tc/jemalloc_metadata";
- snapshot.limit = -1;
- snapshot.cur_consumption = MemInfo::allocator_metadata_mem();
- snapshot.peak_consumption = -1;
- (*snapshots).emplace_back(snapshot);
- all_trackers_mem_sum += MemInfo::allocator_metadata_mem();
-
- snapshot.type = "overview";
- snapshot.label = "reserved_memory";
- snapshot.limit = -1;
- snapshot.cur_consumption =
GlobalMemoryArbitrator::process_reserved_memory();
- snapshot.peak_consumption = -1;
- (*snapshots).emplace_back(snapshot);
-
- snapshot.type = "overview";
- snapshot.label = "sum_of_all_trackers"; // is virtual memory
- snapshot.limit = -1;
- snapshot.cur_consumption = all_trackers_mem_sum;
- snapshot.peak_consumption = -1;
- (*snapshots).emplace_back(snapshot);
-
- snapshot.type = "overview";
-#ifdef ADDRESS_SANITIZER
- snapshot.label = "[ASAN]VmRSS(process resident memory)"; // from /proc
VmRSS VmHWM
-#else
- snapshot.label = "VmRSS(process resident memory)"; // from /proc VmRSS
VmHWM
-#endif
- snapshot.limit = -1;
- snapshot.cur_consumption = PerfCounters::get_vm_rss();
- snapshot.peak_consumption = PerfCounters::get_vm_hwm();
- (*snapshots).emplace_back(snapshot);
-
- snapshot.type = "overview";
- snapshot.label = "VmSize(process virtual memory)"; // from /proc VmSize
VmPeak
- snapshot.limit = -1;
- snapshot.cur_consumption = PerfCounters::get_vm_size();
- snapshot.peak_consumption = PerfCounters::get_vm_peak();
- (*snapshots).emplace_back(snapshot);
-}
-
-void MemTrackerLimiter::make_type_snapshots(std::vector<Snapshot>* snapshots,
- MemTrackerLimiter::Type type) {
+void MemTrackerLimiter::make_type_trackers_profile(RuntimeProfile* profile,
+ MemTrackerLimiter::Type
type) {
if (type == Type::GLOBAL) {
std::lock_guard<std::mutex> l(
ExecEnv::GetInstance()->mem_tracker_limiter_pool[0].group_lock);
for (auto trackerWptr :
ExecEnv::GetInstance()->mem_tracker_limiter_pool[0].trackers) {
auto tracker = trackerWptr.lock();
if (tracker != nullptr) {
- (*snapshots).emplace_back(tracker->make_snapshot());
+ tracker->make_profile(profile);
}
}
} else {
@@ -398,125 +275,80 @@ void
MemTrackerLimiter::make_type_snapshots(std::vector<Snapshot>* snapshots,
for (auto trackerWptr :
ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].trackers) {
auto tracker = trackerWptr.lock();
if (tracker != nullptr && tracker->type() == type) {
- (*snapshots).emplace_back(tracker->make_snapshot());
+ tracker->make_profile(profile);
}
}
}
}
}
-void MemTrackerLimiter::make_top_consumption_snapshots(std::vector<Snapshot>*
snapshots,
- int top_num) {
- std::priority_queue<Snapshot> max_pq;
- // not include global type.
+std::string
MemTrackerLimiter::make_type_trackers_profile_str(MemTrackerLimiter::Type type)
{
+ std::unique_ptr<RuntimeProfile> profile_snapshot =
+ std::make_unique<RuntimeProfile>("TypeMemTrackersSnapshot");
+ make_type_trackers_profile(profile_snapshot.get(), type);
+ std::stringstream ss;
+ profile_snapshot->pretty_print(&ss);
+ return ss.str();
+}
+
+void
MemTrackerLimiter::make_top_consumption_tasks_tracker_profile(RuntimeProfile*
profile,
+ int
top_num) {
+ std::unique_ptr<RuntimeProfile> tmp_profile_snapshot =
+ std::make_unique<RuntimeProfile>("tmpSnapshot");
+ std::priority_queue<std::pair<int64_t, RuntimeProfile*>> max_pq;
+ // start from 2, not include global type.
for (unsigned i = 1; i <
ExecEnv::GetInstance()->mem_tracker_limiter_pool.size(); ++i) {
std::lock_guard<std::mutex> l(
ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].group_lock);
for (auto trackerWptr :
ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].trackers) {
auto tracker = trackerWptr.lock();
if (tracker != nullptr) {
- max_pq.emplace(tracker->make_snapshot());
+ auto* profile_snapshot =
tracker->make_profile(tmp_profile_snapshot.get());
+ max_pq.emplace(tracker->consumption(), profile_snapshot);
}
}
}
while (!max_pq.empty() && top_num > 0) {
- (*snapshots).emplace_back(max_pq.top());
+ RuntimeProfile* profile_snapshot =
+ profile->create_child(max_pq.top().second->name(), true,
false);
+ profile_snapshot->merge(max_pq.top().second);
top_num--;
max_pq.pop();
}
}
-void MemTrackerLimiter::make_all_trackers_snapshots(std::vector<Snapshot>*
snapshots) {
- for (auto& i : ExecEnv::GetInstance()->mem_tracker_limiter_pool) {
- std::lock_guard<std::mutex> l(i.group_lock);
- for (auto trackerWptr : i.trackers) {
- auto tracker = trackerWptr.lock();
- if (tracker != nullptr) {
- (*snapshots).emplace_back(tracker->make_snapshot());
- }
- }
- }
-}
-
-void MemTrackerLimiter::make_all_memory_state_snapshots(std::vector<Snapshot>*
snapshots) {
- make_process_snapshots(snapshots);
- make_all_trackers_snapshots(snapshots);
- make_all_reserved_trackers_snapshots(snapshots);
-}
-
-std::string MemTrackerLimiter::log_usage(Snapshot snapshot) {
- return fmt::format("MemTracker Label={}, Type={}, Limit={}({} B),
Used={}({} B), Peak={}({} B)",
- snapshot.label, snapshot.type,
MemCounter::print_bytes(snapshot.limit),
- snapshot.limit,
MemCounter::print_bytes(snapshot.cur_consumption),
- snapshot.cur_consumption,
MemCounter::print_bytes(snapshot.peak_consumption),
- snapshot.peak_consumption);
-}
+void MemTrackerLimiter::make_all_tasks_tracker_profile(RuntimeProfile*
profile) {
+ std::unordered_map<Type, RuntimeProfile*> types_profile;
+ types_profile[Type::QUERY] = profile->create_child("QueryTasks", true,
false);
+ types_profile[Type::LOAD] = profile->create_child("LoadTasks", true,
false);
+ types_profile[Type::COMPACTION] = profile->create_child("CompactionTasks",
true, false);
+ types_profile[Type::SCHEMA_CHANGE] =
profile->create_child("SchemaChangeTasks", true, false);
+ types_profile[Type::OTHER] = profile->create_child("OtherTasks", true,
false);
-std::string MemTrackerLimiter::type_log_usage(Snapshot snapshot) {
- return fmt::format("Type={}, Used={}({} B), Peak={}({} B)", snapshot.type,
- MemCounter::print_bytes(snapshot.cur_consumption),
snapshot.cur_consumption,
- MemCounter::print_bytes(snapshot.peak_consumption),
- snapshot.peak_consumption);
-}
-
-std::string MemTrackerLimiter::type_detail_usage(const std::string& msg, Type
type) {
- std::string detail = fmt::format("{}, Type:{}, Memory Tracker Summary",
msg, type_string(type));
+ // start from 2, not include global type.
for (unsigned i = 1; i <
ExecEnv::GetInstance()->mem_tracker_limiter_pool.size(); ++i) {
std::lock_guard<std::mutex> l(
ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].group_lock);
for (auto trackerWptr :
ExecEnv::GetInstance()->mem_tracker_limiter_pool[i].trackers) {
auto tracker = trackerWptr.lock();
- if (tracker != nullptr && tracker->type() == type) {
- detail += "\n " +
MemTrackerLimiter::log_usage(tracker->make_snapshot());
+ if (tracker != nullptr) {
+ tracker->make_profile(types_profile[tracker->type()]);
}
}
}
- return detail;
}
void MemTrackerLimiter::print_log_usage(const std::string& msg) {
if (_enable_print_log_usage) {
_enable_print_log_usage = false;
std::string detail = msg;
- detail += "\nProcess Memory Summary:\n " +
GlobalMemoryArbitrator::process_mem_log_str();
- detail += "\nMemory Tracker Summary: " + log_usage();
+ detail += "\nProcess Memory Summary: " +
GlobalMemoryArbitrator::process_mem_log_str();
+ detail += "\n" + make_profile_str();
LOG(WARNING) << detail;
}
}
-std::string MemTrackerLimiter::log_process_usage_str() {
- std::string detail;
- detail += "\nProcess Memory Summary:\n " +
GlobalMemoryArbitrator::process_mem_log_str();
- std::vector<Snapshot> snapshots;
- MemTrackerLimiter::make_process_snapshots(&snapshots);
- MemTrackerLimiter::make_type_snapshots(&snapshots,
MemTrackerLimiter::Type::GLOBAL);
- MemTrackerLimiter::make_top_consumption_snapshots(&snapshots, 15);
- MemTrackerLimiter::make_all_reserved_trackers_snapshots(&snapshots);
-
- detail += "\nMemory Tracker Summary:";
- for (const auto& snapshot : snapshots) {
- if (snapshot.label.empty()) {
- detail += "\n " + MemTrackerLimiter::type_log_usage(snapshot);
- } else {
- detail += "\n " + MemTrackerLimiter::log_usage(snapshot);
- }
- }
-
- // Add additional tracker printed when memory exceeds limit.
- detail += "\n " +
-
ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker()->log_usage();
- return detail;
-}
-
-void MemTrackerLimiter::print_log_process_usage() {
- // The default interval between two prints is 100ms
(config::memory_maintenance_sleep_time_ms).
- if (MemTrackerLimiter::_enable_print_log_process_usage) {
- MemTrackerLimiter::_enable_print_log_process_usage = false;
- LOG(WARNING) << log_process_usage_str();
- }
-}
-
std::string MemTrackerLimiter::tracker_limit_exceeded_str() {
std::string err_msg = fmt::format(
"memory tracker limit exceeded, tracker label:{}, type:{}, limit "
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h
b/be/src/runtime/memory/mem_tracker_limiter.h
index 251a7c25a74..445856b1f6a 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -85,27 +85,47 @@ public:
OTHER = 5,
};
- struct Snapshot {
- std::string type;
- std::string label;
- int64_t limit = 0;
- int64_t cur_consumption = 0;
- int64_t peak_consumption = 0;
-
- bool operator<(const Snapshot& rhs) const { return cur_consumption <
rhs.cur_consumption; }
- };
+ static std::string type_string(Type type) {
+ switch (type) {
+ case Type::GLOBAL:
+ return "global";
+ case Type::QUERY:
+ return "query";
+ case Type::LOAD:
+ return "load";
+ case Type::COMPACTION:
+ return "compaction";
+ case Type::SCHEMA_CHANGE:
+ return "schema_change";
+ case Type::OTHER:
+ return "other";
+ default:
+ LOG(FATAL) << "not match type of mem tracker limiter :" <<
static_cast<int>(type);
+ }
+ LOG(FATAL) << "__builtin_unreachable";
+ __builtin_unreachable();
+ }
- // Corresponding to MemTrackerLimiter::Type.
- // MemCounter contains atomic variables, which are not allowed to be
copied or moved.
- inline static std::unordered_map<Type, MemCounter> TypeMemSum;
+ static std::string gc_type_string(GCType type) {
+ switch (type) {
+ case GCType::PROCESS:
+ return "process";
+ case GCType::WORK_LOAD_GROUP:
+ return "work load group";
+ default:
+ LOG(FATAL) << "not match gc type:" << static_cast<int>(type);
+ }
+ LOG(FATAL) << "__builtin_unreachable";
+ __builtin_unreachable();
+ }
/*
* Part 2, Constructors and property methods
*/
- static std::shared_ptr<MemTrackerLimiter> create_shared(
- MemTrackerLimiter::Type type, const std::string& label =
std::string(),
- int64_t byte_limit = -1);
+ static std::shared_ptr<MemTrackerLimiter>
create_shared(MemTrackerLimiter::Type type,
+ const std::string&
label,
+ int64_t byte_limit
= -1);
// byte_limit equal to -1 means no consumption limit, only participate in
process memory statistics.
MemTrackerLimiter(Type type, const std::string& label, int64_t byte_limit);
@@ -119,10 +139,14 @@ public:
int64_t limit() const { return _limit; }
bool limit_exceeded() const { return _limit >= 0 && _limit <
consumption(); }
Status check_limit(int64_t bytes = 0);
+ // Log the memory usage when memory limit is exceeded.
+ std::string tracker_limit_exceeded_str();
bool is_overcommit_tracker() const { return type() == Type::QUERY ||
type() == Type::LOAD; }
bool is_query_cancelled() { return _is_query_cancelled; }
void set_is_query_cancelled(bool is_cancelled) {
_is_query_cancelled.store(is_cancelled); }
+ static void clean_tracker_limiter_group();
+
/*
* Part 3, Memory tracking method (use carefully!)
*
@@ -197,36 +221,18 @@ public:
DCHECK(reserved_consumption() >= 0);
}
- Snapshot make_reserved_trackers_snapshot() const;
- static void make_all_reserved_trackers_snapshots(std::vector<Snapshot>*
snapshots);
-
/*
- * Part 4, Memory snapshot and log method
+ * Part 4, Memory profile and log method
*/
+ RuntimeProfile* make_profile(RuntimeProfile* profile) const;
+ std::string make_profile_str() const;
+ static void make_type_trackers_profile(RuntimeProfile* profile,
MemTrackerLimiter::Type type);
+ static std::string make_type_trackers_profile_str(MemTrackerLimiter::Type
type);
+ static void make_top_consumption_tasks_tracker_profile(RuntimeProfile*
profile, int top_num);
+ static void make_all_tasks_tracker_profile(RuntimeProfile* profile);
- static void refresh_global_counter();
- static void clean_tracker_limiter_group();
-
- Snapshot make_snapshot() const;
- // Returns a list of all the valid tracker snapshots.
- static void make_process_snapshots(std::vector<Snapshot>* snapshots);
- static void make_type_snapshots(std::vector<Snapshot>* snapshots, Type
type);
- static void make_all_trackers_snapshots(std::vector<Snapshot>* snapshots);
- static void make_all_memory_state_snapshots(std::vector<Snapshot>*
snapshots);
- static void make_top_consumption_snapshots(std::vector<Snapshot>*
snapshots, int top_num);
-
- static std::string log_usage(Snapshot snapshot);
- std::string log_usage() const { return log_usage(make_snapshot()); }
- static std::string type_log_usage(Snapshot snapshot);
- static std::string type_detail_usage(const std::string& msg, Type type);
void print_log_usage(const std::string& msg);
void enable_print_log_usage() { _enable_print_log_usage = true; }
- // process memory changes more than 256M, or the GC ends
- static void enable_print_log_process_usage() {
_enable_print_log_process_usage = true; }
- static std::string log_process_usage_str();
- static void print_log_process_usage();
- // Log the memory usage when memory limit is exceeded.
- std::string tracker_limit_exceeded_str();
/*
* Part 5, Memory GC method
@@ -270,44 +276,6 @@ public:
bool is_group_commit_load {false};
private:
- /*
- * Part 7, Private method
- */
-
- static std::string type_string(Type type) {
- switch (type) {
- case Type::GLOBAL:
- return "global";
- case Type::QUERY:
- return "query";
- case Type::LOAD:
- return "load";
- case Type::COMPACTION:
- return "compaction";
- case Type::SCHEMA_CHANGE:
- return "schema_change";
- case Type::OTHER:
- return "other";
- default:
- LOG(FATAL) << "not match type of mem tracker limiter :" <<
static_cast<int>(type);
- }
- LOG(FATAL) << "__builtin_unreachable";
- __builtin_unreachable();
- }
-
- static std::string gc_type_string(GCType type) {
- switch (type) {
- case GCType::PROCESS:
- return "process";
- case GCType::WORK_LOAD_GROUP:
- return "work load group";
- default:
- LOG(FATAL) << "not match gc type:" << static_cast<int>(type);
- }
- LOG(FATAL) << "__builtin_unreachable";
- __builtin_unreachable();
- }
-
// only for Type::QUERY or Type::LOAD.
static TUniqueId label_to_queryid(const std::string& label) {
if (label.find("#Id=") == std::string::npos) {
@@ -332,6 +300,8 @@ private:
// label used in the make snapshot, not guaranteed unique.
std::string _label;
+ // For generate runtime profile, profile name must be unique.
+ UniqueId _uid;
MemCounter _mem_counter;
MemCounter _reserved_counter;
@@ -351,7 +321,6 @@ private:
// Avoid frequent printing.
bool _enable_print_log_usage = false;
- static std::atomic<bool> _enable_print_log_process_usage;
std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
diff --git a/be/src/runtime/memory/memory_profile.cpp
b/be/src/runtime/memory/memory_profile.cpp
new file mode 100644
index 00000000000..8dbdcbdd3af
--- /dev/null
+++ b/be/src/runtime/memory/memory_profile.cpp
@@ -0,0 +1,353 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/memory/memory_profile.h"
+
+#include "bvar/reducer.h"
+#include "runtime/exec_env.h"
+#include "runtime/memory/global_memory_arbitrator.h"
+#include "runtime/memory/mem_tracker_limiter.h"
+#include "util/mem_info.h"
+#include "util/runtime_profile.h"
+
+namespace doris {
+
+static bvar::Adder<int64_t>
memory_all_tracked_sum_bytes("memory_all_tracked_sum_bytes");
+static bvar::Adder<int64_t>
memory_global_trackers_sum_bytes("memory_global_trackers_sum_bytes");
+static bvar::Adder<int64_t>
memory_query_trackers_sum_bytes("memory_query_trackers_sum_bytes");
+static bvar::Adder<int64_t>
memory_load_trackers_sum_bytes("memory_load_trackers_sum_bytes");
+static bvar::Adder<int64_t> memory_compaction_trackers_sum_bytes(
+ "memory_compaction_trackers_sum_bytes");
+static bvar::Adder<int64_t> memory_schema_change_trackers_sum_bytes(
+ "memory_schema_change_trackers_sum_bytes");
+static bvar::Adder<int64_t>
memory_other_trackers_sum_bytes("memory_other_trackers_sum_bytes");
+static bvar::Adder<int64_t>
memory_reserved_memory_bytes("memory_reserved_memory_bytes");
+static bvar::Adder<int64_t>
memory_all_tasks_memory_bytes("memory_all_tasks_memory_bytes");
+static bvar::Adder<int64_t>
memory_untracked_memory_bytes("memory_untracked_memory_bytes");
+
+MemoryProfile::MemoryProfile() {
+
_memory_overview_profile.set(std::make_unique<RuntimeProfile>("MemoryOverviewSnapshot"));
+
_global_memory_profile.set(std::make_unique<RuntimeProfile>("GlobalMemorySnapshot"));
+
_top_memory_tasks_profile.set(std::make_unique<RuntimeProfile>("TopMemoryTasksSnapshot"));
+
_tasks_memory_profile.set(std::make_unique<RuntimeProfile>("TasksMemorySnapshot"));
+}
+
+void MemoryProfile::refresh_memory_overview_profile() {
+#ifdef ADDRESS_SANITIZER
+ std::unique_ptr<RuntimeProfile> memory_overview_profile =
+ std::make_unique<RuntimeProfile>("[ASAN]MemoryOverviewSnapshot");
+#else
+ std::unique_ptr<RuntimeProfile> memory_overview_profile =
+ std::make_unique<RuntimeProfile>("MemoryOverviewSnapshot");
+#endif
+ std::unique_ptr<RuntimeProfile> global_memory_profile =
+ std::make_unique<RuntimeProfile>("GlobalMemorySnapshot");
+ std::unique_ptr<RuntimeProfile> top_memory_tasks_profile =
+ std::make_unique<RuntimeProfile>("TopMemoryTasksSnapshot");
+
+ // 1. create profile
+ RuntimeProfile* untracked_memory_profile =
+ memory_overview_profile->create_child("UntrackedMemory", true,
false);
+ RuntimeProfile* tracked_memory_profile =
+ memory_overview_profile->create_child("TrackedMemory", true,
false);
+ RuntimeProfile* tasks_memory_overview_profile =
+ tracked_memory_profile->create_child("TasksMemory", true, false);
+ RuntimeProfile* tasks_memory_overview_details_profile =
+ tasks_memory_overview_profile->create_child("Details", true,
false);
+ RuntimeProfile* global_memory_overview_profile =
+ tracked_memory_profile->create_child("GlobalMemory", true, false);
+ RuntimeProfile* jemalloc_memory_profile =
+ tracked_memory_profile->create_child("JemallocMemory", true,
false);
+ RuntimeProfile* jemalloc_memory_details_profile =
+ jemalloc_memory_profile->create_child("Details", true, false);
+
+ // 2. add counter
+ // 2.1 add process memory counter
+ RuntimeProfile::Counter* process_physical_memory_current_usage_counter =
+ ADD_COUNTER(memory_overview_profile, "PhysicalMemory(VmRSS)",
TUnit::BYTES);
+ RuntimeProfile::Counter* process_physical_memory_peak_usage_counter =
+
memory_overview_profile->AddHighWaterMarkCounter("PhysicalMemoryPeak",
TUnit::BYTES);
+ RuntimeProfile::Counter* process_virtual_memory_current_usage_counter =
+ ADD_COUNTER(memory_overview_profile, "VirtualMemory(VmSize)",
TUnit::BYTES);
+ RuntimeProfile::Counter* process_virtual_memory_peak_usage_counter =
+
memory_overview_profile->AddHighWaterMarkCounter("VirtualMemoryPeak",
TUnit::BYTES);
+
+ // 2.2 add untracked memory counter
+ RuntimeProfile::Counter* untracked_memory_current_usage_counter =
+ ADD_COUNTER(untracked_memory_profile, "CurrentUsage",
TUnit::BYTES);
+ RuntimeProfile::Counter* untracked_memory_peak_usage_counter =
+ untracked_memory_profile->AddHighWaterMarkCounter("PeakUsage",
TUnit::BYTES);
+
+ // 2.3 add tracked memory counter
+ RuntimeProfile::Counter* tracked_memory_current_usage_counter =
+ ADD_COUNTER(tracked_memory_profile, "CurrentUsage", TUnit::BYTES);
+ RuntimeProfile::Counter* tracked_memory_peak_usage_counter =
+ tracked_memory_profile->AddHighWaterMarkCounter("PeakUsage",
TUnit::BYTES);
+
+ // 2.4 add jemalloc memory counter
+ RuntimeProfile::Counter* jemalloc_memory_current_usage_counter =
+ ADD_COUNTER(jemalloc_memory_profile, "CurrentUsage", TUnit::BYTES);
+ RuntimeProfile::Counter* jemalloc_memory_peak_usage_counter =
+ jemalloc_memory_profile->AddHighWaterMarkCounter("PeakUsage",
TUnit::BYTES);
+ RuntimeProfile::Counter* jemalloc_cache_current_usage_counter =
+ ADD_COUNTER(jemalloc_memory_details_profile, "Cache",
TUnit::BYTES);
+ RuntimeProfile::Counter* jemalloc_cache_peak_usage_counter =
+
jemalloc_memory_details_profile->AddHighWaterMarkCounter("CachePeak",
TUnit::BYTES);
+ RuntimeProfile::Counter* jemalloc_metadata_current_usage_counter =
+ ADD_COUNTER(jemalloc_memory_details_profile, "Metadata",
TUnit::BYTES);
+ RuntimeProfile::Counter* jemalloc_metadata_peak_usage_counter =
+
jemalloc_memory_details_profile->AddHighWaterMarkCounter("MetadataPeak",
TUnit::BYTES);
+
+ // 2.5 add global memory counter
+ RuntimeProfile::Counter* global_current_usage_counter =
+ ADD_COUNTER(global_memory_overview_profile, "CurrentUsage",
TUnit::BYTES);
+ RuntimeProfile::Counter* global_peak_usage_counter =
+
global_memory_overview_profile->AddHighWaterMarkCounter("PeakUsage",
TUnit::BYTES);
+
+ // 2.6 add tasks memory counter
+ RuntimeProfile::Counter* tasks_memory_current_usage_counter =
+ ADD_COUNTER_WITH_LEVEL(tasks_memory_overview_profile,
"CurrentUsage", TUnit::BYTES, 1);
+ // Reserved memory is the sum of all task reserved memory, is duplicated
with all task memory counter.
+ RuntimeProfile::Counter* reserved_memory_current_usage_counter =
ADD_CHILD_COUNTER_WITH_LEVEL(
+ tasks_memory_overview_profile, "ReservedMemory", TUnit::BYTES,
"CurrentUsage", 1);
+ RuntimeProfile::Counter* reserved_memory_peak_usage_counter =
+
tasks_memory_overview_profile->AddHighWaterMarkCounter("ReservedMemoryPeak",
+
TUnit::BYTES, "CurrentUsage", 1);
+ RuntimeProfile::Counter* tasks_memory_peak_usage_counter =
+
tasks_memory_overview_profile->AddHighWaterMarkCounter("PeakUsage",
TUnit::BYTES);
+ RuntimeProfile::Counter* query_current_usage_counter =
+ ADD_COUNTER_WITH_LEVEL(tasks_memory_overview_details_profile,
"Query", TUnit::BYTES, 1);
+ RuntimeProfile::Counter* query_peak_usage_counter =
+ tasks_memory_overview_details_profile->AddHighWaterMarkCounter(
+ "QueryPeak", TUnit::BYTES, "Query", 1);
+ RuntimeProfile::Counter* load_current_usage_counter =
+ ADD_COUNTER_WITH_LEVEL(tasks_memory_overview_details_profile,
"Load", TUnit::BYTES, 1);
+ RuntimeProfile::Counter* load_peak_usage_counter =
+
tasks_memory_overview_details_profile->AddHighWaterMarkCounter("LoadPeak",
TUnit::BYTES,
+
"Load", 1);
+ RuntimeProfile::Counter* load_all_memtables_current_usage_counter =
+ ADD_CHILD_COUNTER_WITH_LEVEL(tasks_memory_overview_details_profile,
+ "AllMemTablesMemory", TUnit::BYTES,
"Load", 1);
+ RuntimeProfile::Counter* load_all_memtables_peak_usage_counter =
+ ADD_CHILD_COUNTER_WITH_LEVEL(tasks_memory_overview_details_profile,
+ "AllMemTablesMemoryPeak",
TUnit::BYTES, "Load", 1);
+ RuntimeProfile::Counter* compaction_current_usage_counter =
ADD_COUNTER_WITH_LEVEL(
+ tasks_memory_overview_details_profile, "Compaction", TUnit::BYTES,
1);
+ RuntimeProfile::Counter* compaction_peak_usage_counter =
+ tasks_memory_overview_details_profile->AddHighWaterMarkCounter(
+ "CompactionPeak", TUnit::BYTES, "Compaction", 1);
+ RuntimeProfile::Counter* schema_change_current_usage_counter =
ADD_COUNTER_WITH_LEVEL(
+ tasks_memory_overview_details_profile, "SchemaChange",
TUnit::BYTES, 1);
+ RuntimeProfile::Counter* schema_change_peak_usage_counter =
+ tasks_memory_overview_details_profile->AddHighWaterMarkCounter(
+ "SchemaChangePeak", TUnit::BYTES, "SchemaChange", 1);
+ RuntimeProfile::Counter* other_current_usage_counter =
+ ADD_COUNTER_WITH_LEVEL(tasks_memory_overview_details_profile,
"Other", TUnit::BYTES, 1);
+ RuntimeProfile::Counter* other_peak_usage_counter =
+ tasks_memory_overview_details_profile->AddHighWaterMarkCounter(
+ "OtherPeak", TUnit::BYTES, "Other", 1);
+ // 3. refresh counter
+ // 3.1 refresh process memory counter
+ COUNTER_SET(process_physical_memory_current_usage_counter,
+ PerfCounters::get_vm_rss()); // from /proc VmRSS VmHWM
+ COUNTER_SET(process_physical_memory_peak_usage_counter,
PerfCounters::get_vm_hwm());
+ COUNTER_SET(process_virtual_memory_current_usage_counter,
+ PerfCounters::get_vm_size()); // from /proc VmSize VmPeak
+ COUNTER_SET(process_virtual_memory_peak_usage_counter,
PerfCounters::get_vm_peak());
+
+ // 3.2 refresh tracked memory counter
+ std::unordered_map<MemTrackerLimiter::Type, int64_t> type_mem_sum = {
+ {MemTrackerLimiter::Type::GLOBAL, 0},
{MemTrackerLimiter::Type::QUERY, 0},
+ {MemTrackerLimiter::Type::LOAD, 0},
{MemTrackerLimiter::Type::COMPACTION, 0},
+ {MemTrackerLimiter::Type::SCHEMA_CHANGE, 0},
{MemTrackerLimiter::Type::OTHER, 0}};
+ // always ExecEnv::ready(), because Daemon::_stop_background_threads_latch
+ for (auto& group : ExecEnv::GetInstance()->mem_tracker_limiter_pool) {
+ std::lock_guard<std::mutex> l(group.group_lock);
+ for (auto trackerWptr : group.trackers) {
+ auto tracker = trackerWptr.lock();
+ if (tracker != nullptr) {
+ type_mem_sum[tracker->type()] += tracker->consumption();
+ }
+ }
+ }
+
+ int64_t all_tracked_mem_sum = 0;
+ int64_t tasks_trackers_mem_sum = 0;
+ for (auto it : type_mem_sum) {
+ all_tracked_mem_sum += it.second;
+ switch (it.first) {
+ case MemTrackerLimiter::Type::GLOBAL:
+ COUNTER_SET(global_current_usage_counter, it.second);
+ COUNTER_SET(global_peak_usage_counter, it.second);
+ memory_global_trackers_sum_bytes
+ << it.second -
memory_global_trackers_sum_bytes.get_value();
+ break;
+ case MemTrackerLimiter::Type::QUERY:
+ COUNTER_SET(query_current_usage_counter, it.second);
+ COUNTER_SET(query_peak_usage_counter, it.second);
+ tasks_trackers_mem_sum += it.second;
+ memory_query_trackers_sum_bytes
+ << it.second - memory_query_trackers_sum_bytes.get_value();
+ break;
+ case MemTrackerLimiter::Type::LOAD:
+ COUNTER_SET(load_current_usage_counter, it.second);
+ COUNTER_SET(load_peak_usage_counter, it.second);
+ tasks_trackers_mem_sum += it.second;
+ memory_load_trackers_sum_bytes
+ << it.second - memory_load_trackers_sum_bytes.get_value();
+ break;
+ case MemTrackerLimiter::Type::COMPACTION:
+ COUNTER_SET(compaction_current_usage_counter, it.second);
+ COUNTER_SET(compaction_peak_usage_counter, it.second);
+ tasks_trackers_mem_sum += it.second;
+ memory_compaction_trackers_sum_bytes
+ << it.second -
memory_compaction_trackers_sum_bytes.get_value();
+ break;
+ case MemTrackerLimiter::Type::SCHEMA_CHANGE:
+ COUNTER_SET(schema_change_current_usage_counter, it.second);
+ COUNTER_SET(schema_change_peak_usage_counter, it.second);
+ tasks_trackers_mem_sum += it.second;
+ memory_schema_change_trackers_sum_bytes
+ << it.second -
memory_schema_change_trackers_sum_bytes.get_value();
+ break;
+ case MemTrackerLimiter::Type::OTHER:
+ COUNTER_SET(other_current_usage_counter, it.second);
+ COUNTER_SET(other_peak_usage_counter, it.second);
+ tasks_trackers_mem_sum += it.second;
+ memory_other_trackers_sum_bytes
+ << it.second - memory_other_trackers_sum_bytes.get_value();
+ }
+ }
+
+ MemTrackerLimiter::make_type_trackers_profile(global_memory_profile.get(),
+
MemTrackerLimiter::Type::GLOBAL);
+
+
MemTrackerLimiter::make_top_consumption_tasks_tracker_profile(top_memory_tasks_profile.get(),
+ 15);
+
+ COUNTER_SET(tasks_memory_current_usage_counter, tasks_trackers_mem_sum);
+ COUNTER_SET(tasks_memory_peak_usage_counter, tasks_trackers_mem_sum);
+ memory_all_tasks_memory_bytes << tasks_trackers_mem_sum -
+
memory_all_tasks_memory_bytes.get_value();
+
+ COUNTER_SET(reserved_memory_current_usage_counter,
+ GlobalMemoryArbitrator::process_reserved_memory());
+ COUNTER_SET(reserved_memory_peak_usage_counter,
+ GlobalMemoryArbitrator::process_reserved_memory());
+ memory_reserved_memory_bytes <<
GlobalMemoryArbitrator::process_reserved_memory() -
+
memory_reserved_memory_bytes.get_value();
+
+ all_tracked_mem_sum += MemInfo::allocator_cache_mem();
+ COUNTER_SET(jemalloc_cache_current_usage_counter,
+ static_cast<int64_t>(MemInfo::allocator_cache_mem()));
+ COUNTER_SET(jemalloc_cache_peak_usage_counter,
+ static_cast<int64_t>(MemInfo::allocator_cache_mem()));
+ all_tracked_mem_sum += MemInfo::allocator_metadata_mem();
+ COUNTER_SET(jemalloc_metadata_current_usage_counter,
+ static_cast<int64_t>(MemInfo::allocator_metadata_mem()));
+ COUNTER_SET(jemalloc_metadata_peak_usage_counter,
+ static_cast<int64_t>(MemInfo::allocator_metadata_mem()));
+ COUNTER_SET(jemalloc_memory_current_usage_counter,
+ jemalloc_cache_current_usage_counter->value() +
+ jemalloc_metadata_current_usage_counter->value());
+ COUNTER_SET(jemalloc_memory_peak_usage_counter,
+ jemalloc_cache_current_usage_counter->value() +
+ jemalloc_metadata_current_usage_counter->value());
+
+ COUNTER_SET(tracked_memory_current_usage_counter, all_tracked_mem_sum);
+ COUNTER_SET(tracked_memory_peak_usage_counter, all_tracked_mem_sum);
+ memory_all_tracked_sum_bytes << all_tracked_mem_sum -
memory_all_tracked_sum_bytes.get_value();
+
+ // 3.3 refresh untracked memory counter
+ int64_t untracked_memory =
+ process_physical_memory_current_usage_counter->value() -
all_tracked_mem_sum;
+ COUNTER_SET(untracked_memory_current_usage_counter, untracked_memory);
+ COUNTER_SET(untracked_memory_peak_usage_counter, untracked_memory);
+ memory_untracked_memory_bytes << untracked_memory -
memory_untracked_memory_bytes.get_value();
+
+ // 3.4 refresh additional tracker printed when memory exceeds limit.
+ COUNTER_SET(load_all_memtables_current_usage_counter,
+
ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker()->consumption());
+ COUNTER_SET(
+ load_all_memtables_peak_usage_counter,
+
ExecEnv::GetInstance()->memtable_memory_limiter()->mem_tracker()->peak_consumption());
+
+ // 4. reset profile
+ _memory_overview_profile.set(std::move(memory_overview_profile));
+ _global_memory_profile.set(std::move(global_memory_profile));
+ _top_memory_tasks_profile.set(std::move(top_memory_tasks_profile));
+}
+
+void MemoryProfile::refresh_tasks_memory_profile() {
+ std::unique_ptr<RuntimeProfile> tasks_memory_profile =
+ std::make_unique<RuntimeProfile>("AllTasksMemorySnapshot");
+
MemTrackerLimiter::make_all_tasks_tracker_profile(tasks_memory_profile.get());
+ _tasks_memory_profile.set(std::move(tasks_memory_profile));
+}
+
+void MemoryProfile::make_memory_profile(RuntimeProfile* profile) const {
+ RuntimeProfile* memory_profile_snapshot =
profile->create_child("MemoryProfile", true, false);
+
+ auto memory_overview_version_ptr = _memory_overview_profile.get();
+ RuntimeProfile* memory_overview_profile =
+
memory_profile_snapshot->create_child(memory_overview_version_ptr->name(),
true, false);
+
memory_overview_profile->merge(const_cast<RuntimeProfile*>(memory_overview_version_ptr.get()));
+
+ auto global_memory_version_ptr = _global_memory_profile.get();
+ RuntimeProfile* global_memory_profile =
+
memory_profile_snapshot->create_child(global_memory_version_ptr->name(), true,
false);
+
global_memory_profile->merge(const_cast<RuntimeProfile*>(global_memory_version_ptr.get()));
+
+ auto top_memory_tasks_version_ptr = _top_memory_tasks_profile.get();
+ RuntimeProfile* top_memory_tasks_profile =
memory_profile_snapshot->create_child(
+ top_memory_tasks_version_ptr->name(), true, false);
+ top_memory_tasks_profile->merge(
+ const_cast<RuntimeProfile*>(top_memory_tasks_version_ptr.get()));
+
+ auto tasks_memory_version_ptr = _tasks_memory_profile.get();
+ RuntimeProfile* tasks_memory_profile =
+
memory_profile_snapshot->create_child(tasks_memory_version_ptr->name(), true,
false);
+
tasks_memory_profile->merge(const_cast<RuntimeProfile*>(tasks_memory_version_ptr.get()));
+}
+
+int64_t MemoryProfile::query_current_usage() {
+ return memory_query_trackers_sum_bytes.get_value();
+}
+int64_t MemoryProfile::load_current_usage() {
+ return memory_load_trackers_sum_bytes.get_value();
+}
+int64_t MemoryProfile::compaction_current_usage() {
+ return memory_compaction_trackers_sum_bytes.get_value();
+}
+int64_t MemoryProfile::schema_change_current_usage() {
+ return memory_schema_change_trackers_sum_bytes.get_value();
+}
+int64_t MemoryProfile::other_current_usage() {
+ return memory_other_trackers_sum_bytes.get_value();
+}
+
+void MemoryProfile::print_log_process_usage() {
+ if (_enable_print_log_process_usage) {
+ _enable_print_log_process_usage = false;
+ LOG(WARNING) << "Process Memory Summary: " +
GlobalMemoryArbitrator::process_mem_log_str();
+ LOG(WARNING) << "\n" << print_memory_overview_profile();
+ LOG(WARNING) << "\n" << print_global_memory_profile();
+ LOG(WARNING) << "\n" << print_top_memory_tasks_profile();
+ }
+}
+
+} // namespace doris
diff --git a/be/src/runtime/memory/memory_profile.h
b/be/src/runtime/memory/memory_profile.h
new file mode 100644
index 00000000000..9f1bab0c02a
--- /dev/null
+++ b/be/src/runtime/memory/memory_profile.h
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <common/multi_version.h>
+
+#include "util/runtime_profile.h"
+
+namespace doris {
+
+class MemoryProfile {
+public:
+ MemoryProfile();
+
+ void refresh_memory_overview_profile();
+ void refresh_tasks_memory_profile();
+
+ void make_memory_profile(RuntimeProfile* profile) const;
+
+ std::string print_memory_overview_profile() const {
+ std::stringstream ss;
+ auto version_ptr = _memory_overview_profile.get();
+ version_ptr->pretty_print(&ss);
+ return ss.str();
+ }
+
+ std::string print_global_memory_profile() const {
+ std::stringstream ss;
+ auto version_ptr = _global_memory_profile.get();
+ version_ptr->pretty_print(&ss);
+ return ss.str();
+ }
+
+ std::string print_top_memory_tasks_profile() const {
+ std::stringstream ss;
+ auto version_ptr = _top_memory_tasks_profile.get();
+ version_ptr->pretty_print(&ss);
+ return ss.str();
+ }
+
+ std::string print_tasks_memory_profile() const {
+ std::stringstream ss;
+ auto version_ptr = _tasks_memory_profile.get();
+ version_ptr->pretty_print(&ss);
+ return ss.str();
+ }
+
+ static int64_t query_current_usage();
+ static int64_t load_current_usage();
+ static int64_t compaction_current_usage();
+ static int64_t schema_change_current_usage();
+ static int64_t other_current_usage();
+
+ // process memory changes more than 256M, or the GC ends
+ void enable_print_log_process_usage() { _enable_print_log_process_usage =
true; }
+ void print_log_process_usage();
+
+private:
+ MultiVersion<RuntimeProfile> _memory_overview_profile;
+ MultiVersion<RuntimeProfile> _global_memory_profile;
+ MultiVersion<RuntimeProfile> _top_memory_tasks_profile;
+ MultiVersion<RuntimeProfile> _tasks_memory_profile;
+
+ std::atomic<bool> _enable_print_log_process_usage {true};
+};
+
+} // namespace doris
diff --git a/be/src/runtime/memory/memory_reclamation.cpp
b/be/src/runtime/memory/memory_reclamation.cpp
index 17f5a41f462..2d6098f7438 100644
--- a/be/src/runtime/memory/memory_reclamation.cpp
+++ b/be/src/runtime/memory/memory_reclamation.cpp
@@ -17,7 +17,8 @@
#include "runtime/memory/memory_reclamation.h"
-#include "runtime/memory/cache_manager.h"
+#include "runtime/exec_env.h"
+#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/workload_group/workload_group.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "util/mem_info.h"
@@ -55,9 +56,15 @@ bool MemoryReclamation::process_minor_gc(std::string
mem_info) {
}
if (config::enable_query_memory_overcommit) {
- VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
- "[MemoryGC] before free top memory overcommit query in minor
GC",
- MemTrackerLimiter::Type::QUERY);
+ if (config::crash_in_memory_tracker_inaccurate) {
+ LOG(INFO) << fmt::format(
+ "[MemoryGC] before free top memory overcommit query in
minor GC, Type:{}, "
+ "Memory "
+ "Tracker Summary: {}",
+
MemTrackerLimiter::type_string(MemTrackerLimiter::Type::QUERY),
+ MemTrackerLimiter::make_type_trackers_profile_str(
+ MemTrackerLimiter::Type::QUERY));
+ }
RuntimeProfile* toq_profile =
profile->create_child("FreeTopOvercommitMemoryQuery", true,
true);
freed_mem += MemTrackerLimiter::free_top_overcommit_query(
@@ -98,8 +105,14 @@ bool MemoryReclamation::process_full_gc(std::string
mem_info) {
}
}
- VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
- "[MemoryGC] before free top memory query in full GC",
MemTrackerLimiter::Type::QUERY);
+ if (config::crash_in_memory_tracker_inaccurate) {
+ LOG(INFO) << fmt::format(
+ "[MemoryGC] before free top memory query in full GC, Type:{},
Memory Tracker "
+ "Summary: "
+ "{}",
+ MemTrackerLimiter::type_string(MemTrackerLimiter::Type::QUERY),
+
MemTrackerLimiter::make_type_trackers_profile_str(MemTrackerLimiter::Type::QUERY));
+ }
RuntimeProfile* tmq_profile = profile->create_child("FreeTopMemoryQuery",
true, true);
freed_mem += MemTrackerLimiter::free_top_memory_query(
MemInfo::process_full_gc_size() - freed_mem, mem_info,
tmq_profile);
@@ -108,9 +121,14 @@ bool MemoryReclamation::process_full_gc(std::string
mem_info) {
}
if (config::enable_query_memory_overcommit) {
- VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
- "[MemoryGC] before free top memory overcommit load in full GC",
- MemTrackerLimiter::Type::LOAD);
+ if (config::crash_in_memory_tracker_inaccurate) {
+ LOG(INFO) << fmt::format(
+ "[MemoryGC] before free top memory overcommit load in full
GC, Type:{}, Memory "
+ "Tracker Summary: {}",
+
MemTrackerLimiter::type_string(MemTrackerLimiter::Type::LOAD),
+ MemTrackerLimiter::make_type_trackers_profile_str(
+ MemTrackerLimiter::Type::LOAD));
+ }
RuntimeProfile* tol_profile =
profile->create_child("FreeTopMemoryOvercommitLoad", true,
true);
freed_mem += MemTrackerLimiter::free_top_overcommit_load(
@@ -120,8 +138,14 @@ bool MemoryReclamation::process_full_gc(std::string
mem_info) {
}
}
- VLOG_NOTICE << MemTrackerLimiter::type_detail_usage(
- "[MemoryGC] before free top memory load in full GC",
MemTrackerLimiter::Type::LOAD);
+ if (config::crash_in_memory_tracker_inaccurate) {
+ LOG(INFO) << fmt::format(
+ "[MemoryGC] before free top memory load in full GC, Type:{},
Memory Tracker "
+ "Summary: "
+ "{}",
+ MemTrackerLimiter::type_string(MemTrackerLimiter::Type::LOAD),
+
MemTrackerLimiter::make_type_trackers_profile_str(MemTrackerLimiter::Type::LOAD));
+ }
RuntimeProfile* tml_profile = profile->create_child("FreeTopMemoryLoad",
true, true);
freed_mem += MemTrackerLimiter::free_top_memory_load(
MemInfo::process_full_gc_size() - freed_mem, mem_info,
tml_profile);
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index fd14750d8b8..db3b32a6298 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -111,7 +111,7 @@ public:
return fmt::format(
"ThreadMemTrackerMgr debug, _untracked_mem:{}, "
"_limiter_tracker:<{}>, _consumer_tracker_stack:<{}>",
- std::to_string(_untracked_mem), _limiter_tracker->log_usage(),
+ std::to_string(_untracked_mem),
_limiter_tracker->make_profile_str(),
fmt::to_string(consumer_tracker_buf));
}
diff --git a/be/src/runtime/process_profile.cpp
b/be/src/runtime/process_profile.cpp
new file mode 100644
index 00000000000..d91aedbeac2
--- /dev/null
+++ b/be/src/runtime/process_profile.cpp
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/process_profile.h"
+
+#include <memory>
+
+#include "runtime/memory/memory_profile.h"
+
+namespace doris {
+
+ProcessProfile::ProcessProfile() {
+ _memory_profile = std::make_unique<MemoryProfile>();
+}
+
+void ProcessProfile::refresh_profile() {
+ // 1. refresh profile
+ _memory_profile->refresh_memory_overview_profile();
+ _memory_profile->refresh_tasks_memory_profile();
+ // TODO refresh other profile
+
+ // 2. make profile
+ std::unique_ptr<RuntimeProfile> process_profile =
+ std::make_unique<RuntimeProfile>("ProcessProfile");
+ _memory_profile->make_memory_profile(process_profile.get());
+ _process_profile.set(std::move(process_profile));
+ // TODO make other profile
+}
+
+} // namespace doris
diff --git a/be/src/runtime/process_profile.h b/be/src/runtime/process_profile.h
new file mode 100644
index 00000000000..24b128ab552
--- /dev/null
+++ b/be/src/runtime/process_profile.h
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <common/multi_version.h>
+
+#include "runtime/exec_env.h"
+#include "runtime/memory/memory_profile.h"
+#include "util/runtime_profile.h"
+
+namespace doris {
+
+class ProcessProfile {
+public:
+ static ProcessProfile* create_global_instance() { return new
ProcessProfile(); }
+ static ProcessProfile* instance() { return
ExecEnv::GetInstance()->get_process_profile(); }
+ ProcessProfile();
+
+ void refresh_profile();
+
+ std::string print_process_profile() const {
+ auto version_ptr = _process_profile.get();
+ std::stringstream ss;
+ version_ptr->pretty_print(&ss);
+ return ss.str();
+ }
+
+ std::string print_process_profile_no_root() const {
+ std::stringstream ss;
+ std::vector<RuntimeProfile*> profiles;
+ auto version_ptr = _process_profile.get();
+ auto* process_profile =
const_cast<doris::RuntimeProfile*>(version_ptr.get());
+ process_profile->get_children(&profiles);
+ for (auto* profile : profiles) {
+ profile->pretty_print(&ss);
+ }
+ return ss.str();
+ }
+
+ MemoryProfile* memory_profile() { return _memory_profile.get(); }
+
+private:
+ MultiVersion<RuntimeProfile> _process_profile;
+ std::unique_ptr<MemoryProfile> _memory_profile;
+};
+
+} // namespace doris
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 2c69b8a5870..84f0d283cac 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -42,6 +42,7 @@
#include "io/fs/multi_table_pipe.h"
#include "io/fs/stream_load_pipe.h"
#include "runtime/exec_env.h"
+#include "runtime/memory/memory_profile.h"
#include "runtime/message_body_sink.h"
#include "runtime/routine_load/data_consumer.h"
#include "runtime/routine_load/data_consumer_group.h"
@@ -314,8 +315,7 @@ Status RoutineLoadTaskExecutor::submit_task(const
TRoutineLoadTask& task) {
bool RoutineLoadTaskExecutor::_reach_memory_limit() {
bool is_exceed_soft_mem_limit =
GlobalMemoryArbitrator::is_exceed_soft_mem_limit();
- auto current_load_mem_value =
-
MemTrackerLimiter::TypeMemSum[MemTrackerLimiter::Type::LOAD].current_value();
+ auto current_load_mem_value = MemoryProfile::load_current_usage();
if (is_exceed_soft_mem_limit || current_load_mem_value > _load_mem_limit) {
LOG(INFO) << "is_exceed_soft_mem_limit: " << is_exceed_soft_mem_limit
<< " current_load_mem_value: " << current_load_mem_value
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index b1bcfdcc56b..36579452db3 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -74,9 +74,9 @@ std::atomic<int64_t> MemInfo::_s_je_dirty_pages_mem =
std::numeric_limits<int64_
std::atomic<int64_t> MemInfo::_s_je_dirty_pages_mem_limit =
std::numeric_limits<int64_t>::max();
std::atomic<int64_t> MemInfo::_s_virtual_memory_used = 0;
-int64_t MemInfo::_s_cgroup_mem_limit = std::numeric_limits<int64_t>::max();
-int64_t MemInfo::_s_cgroup_mem_usage = std::numeric_limits<int64_t>::min();
-bool MemInfo::_s_cgroup_mem_refresh_state = false;
+std::atomic<int64_t> MemInfo::_s_cgroup_mem_limit =
std::numeric_limits<int64_t>::max();
+std::atomic<int64_t> MemInfo::_s_cgroup_mem_usage =
std::numeric_limits<int64_t>::min();
+std::atomic<bool> MemInfo::_s_cgroup_mem_refresh_state = false;
int64_t MemInfo::_s_cgroup_mem_refresh_wait_times = 0;
static std::unordered_map<std::string, int64_t> _mem_info_bytes;
@@ -94,7 +94,7 @@ void MemInfo::refresh_allocator_mem() {
#elif defined(USE_JEMALLOC)
// jemalloc mallctl refer to : https://jemalloc.net/jemalloc.3.html
// https://www.bookstack.cn/read/aliyun-rds-core/4a0cdf677f62feb3.md
- // Check the Doris BE web page `http://ip:webserver_port/memz` to get the
Jemalloc Profile.
+ // Check the Doris BE web page `http://ip:webserver_port/memory` to get
the Jemalloc Profile.
// 'epoch' is a special mallctl -- it updates the statistics. Without it,
all
// the following calls will return stale values. It increments and returns
@@ -191,7 +191,8 @@ void MemInfo::refresh_proc_meminfo() {
// refresh cgroup memory
if (config::enable_use_cgroup_memory_info) {
if (_s_cgroup_mem_refresh_wait_times >= 0) {
- auto status =
CGroupMemoryCtl::find_cgroup_mem_limit(&_s_cgroup_mem_limit);
+ int64_t cgroup_mem_limit;
+ auto status =
CGroupMemoryCtl::find_cgroup_mem_limit(&cgroup_mem_limit);
if (!status.ok()) {
_s_cgroup_mem_limit = std::numeric_limits<int64_t>::max();
// find cgroup limit failed, wait 300s, 1000 * 100ms.
@@ -200,6 +201,7 @@ void MemInfo::refresh_proc_meminfo() {
"mem limit: "
<< _s_cgroup_mem_limit;
} else {
+ _s_cgroup_mem_limit = cgroup_mem_limit;
// wait 10s, 100 * 100ms, avoid too frequently.
_s_cgroup_mem_refresh_wait_times = -100;
}
@@ -208,11 +210,13 @@ void MemInfo::refresh_proc_meminfo() {
}
if (_s_cgroup_mem_limit != std::numeric_limits<int64_t>::max()) {
- auto status =
CGroupMemoryCtl::find_cgroup_mem_usage(&_s_cgroup_mem_usage);
+ int64_t cgroup_mem_usage;
+ auto status =
CGroupMemoryCtl::find_cgroup_mem_usage(&cgroup_mem_usage);
if (!status.ok()) {
_s_cgroup_mem_usage = std::numeric_limits<int64_t>::min();
_s_cgroup_mem_refresh_state = false;
} else {
+ _s_cgroup_mem_usage = cgroup_mem_usage;
_s_cgroup_mem_refresh_state = true;
}
} else {
@@ -231,7 +235,8 @@ void MemInfo::refresh_proc_meminfo() {
if (physical_mem < 0) {
physical_mem = _s_cgroup_mem_limit;
} else {
- physical_mem = std::min(physical_mem, _s_cgroup_mem_limit);
+ physical_mem =
+ std::min(physical_mem,
_s_cgroup_mem_limit.load(std::memory_order_relaxed));
}
}
diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h
index 60ce26016b1..39ae9eb0b79 100644
--- a/be/src/util/mem_info.h
+++ b/be/src/util/mem_info.h
@@ -219,6 +219,18 @@ public:
return
PrettyPrinter::print(_s_soft_mem_limit.load(std::memory_order_relaxed),
TUnit::BYTES);
}
+ static inline int64_t cgroup_mem_limit() {
+ DCHECK(_s_initialized);
+ return _s_cgroup_mem_limit.load(std::memory_order_relaxed);
+ }
+ static inline int64_t cgroup_mem_usage() {
+ DCHECK(_s_initialized);
+ return _s_cgroup_mem_usage.load(std::memory_order_relaxed);
+ }
+ static inline int64_t cgroup_mem_refresh_state() {
+ DCHECK(_s_initialized);
+ return _s_cgroup_mem_refresh_state.load(std::memory_order_relaxed);
+ }
static std::string debug_string();
@@ -236,9 +248,9 @@ private:
static std::atomic<int64_t> _s_je_dirty_pages_mem_limit;
static std::atomic<int64_t> _s_virtual_memory_used;
- static int64_t _s_cgroup_mem_limit;
- static int64_t _s_cgroup_mem_usage;
- static bool _s_cgroup_mem_refresh_state;
+ static std::atomic<int64_t> _s_cgroup_mem_limit;
+ static std::atomic<int64_t> _s_cgroup_mem_usage;
+ static std::atomic<bool> _s_cgroup_mem_refresh_state;
static int64_t _s_cgroup_mem_refresh_wait_times;
static std::atomic<int64_t> _s_sys_mem_available;
diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp
index a9e197fba9b..e87301880d2 100644
--- a/be/src/util/runtime_profile.cpp
+++ b/be/src/util/runtime_profile.cpp
@@ -274,7 +274,7 @@ void RuntimeProfile::compute_time_in_profile(int64_t total)
{
RuntimeProfile* RuntimeProfile::create_child(const std::string& name, bool
indent, bool prepend) {
std::lock_guard<std::mutex> l(_children_lock);
- DCHECK(_child_map.find(name) == _child_map.end());
+ DCHECK(_child_map.find(name) == _child_map.end()) << ", name: " << name;
RuntimeProfile* child = _pool->add(new RuntimeProfile(name));
if (this->is_set_metadata()) {
child->set_metadata(this->metadata());
@@ -285,8 +285,8 @@ RuntimeProfile* RuntimeProfile::create_child(const
std::string& name, bool inden
if (_children.empty()) {
add_child_unlock(child, indent, nullptr);
} else {
- ChildVector::iterator pos = prepend ? _children.begin() :
_children.end();
- add_child_unlock(child, indent, (*pos).first);
+ auto* pos = prepend ? _children.begin()->first : nullptr;
+ add_child_unlock(child, indent, pos);
}
return child;
}
diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h
index b77157d1f5b..955d77b72aa 100644
--- a/be/src/util/runtime_profile.h
+++ b/be/src/util/runtime_profile.h
@@ -51,8 +51,8 @@ class TRuntimeProfileTree;
#define MACRO_CONCAT(x, y) CONCAT_IMPL(x, y)
#define ADD_LABEL_COUNTER(profile, name) (profile)->add_counter(name,
TUnit::NONE)
-#define ADD_LABEL_COUNTER_WITH_LEVEL(profile, name, type) \
- (profile)->add_counter_with_level(name, TUnit::NONE, type)
+#define ADD_LABEL_COUNTER_WITH_LEVEL(profile, name, level) \
+ (profile)->add_counter_with_level(name, TUnit::NONE, level)
#define ADD_COUNTER(profile, name, type) (profile)->add_counter(name, type)
#define ADD_COUNTER_WITH_LEVEL(profile, name, type, level) \
(profile)->add_counter_with_level(name, type, level)
diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp
index 19969abf6cc..c8f0a7397d7 100644
--- a/be/src/vec/common/allocator.cpp
+++ b/be/src/vec/common/allocator.cpp
@@ -30,12 +30,10 @@
// Allocator is used by too many files. For compilation speed, put
dependencies in `.cpp` as much as possible.
#include "common/compiler_util.h"
#include "common/status.h"
-#include "runtime/fragment_mgr.h"
#include "runtime/memory/global_memory_arbitrator.h"
-#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/memory/thread_mem_tracker_mgr.h"
+#include "runtime/process_profile.h"
#include "runtime/thread_context.h"
-#include "util/defer_op.h"
#include "util/mem_info.h"
#include "util/stack_util.h"
#include "util/uid_util.h"
@@ -135,7 +133,7 @@ void Allocator<clear_memory_, mmap_populate, use_mmap,
MemoryAllocator>::sys_mem
if (wait_milliseconds >=
doris::config::thread_wait_gc_max_milliseconds) {
// Make sure to completely wait
thread_wait_gc_max_milliseconds only once.
doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc();
- doris::MemTrackerLimiter::print_log_process_usage();
+
doris::ProcessProfile::instance()->memory_profile()->print_log_process_usage();
// If the external catch, throw bad::alloc first, let the
query actively cancel. Otherwise asynchronous cancel.
if (!doris::enable_thread_catch_bad_alloc) {
LOG(INFO) << fmt::format(
@@ -154,7 +152,6 @@ void Allocator<clear_memory_, mmap_populate, use_mmap,
MemoryAllocator>::sys_mem
// else, enough memory is available, the query continues execute.
} else if (doris::enable_thread_catch_bad_alloc) {
LOG(INFO) << fmt::format("sys memory check failed, throw
exception, {}.", err_msg);
- doris::MemTrackerLimiter::print_log_process_usage();
throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED,
err_msg);
} else {
LOG(INFO) << fmt::format("sys memory check failed, no throw
exception, {}.", err_msg);
@@ -225,7 +222,7 @@ void Allocator<clear_memory_, mmap_populate, use_mmap,
MemoryAllocator>::throw_b
<< fmt::format("{}, Stacktrace: {}",
doris::GlobalMemoryArbitrator::process_mem_log_str(),
doris::get_stack_trace());
- doris::MemTrackerLimiter::print_log_process_usage();
+
doris::ProcessProfile::instance()->memory_profile()->print_log_process_usage();
throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err);
}
diff --git a/be/test/runtime/memory/mem_tracker_test.cpp
b/be/test/runtime/memory/mem_tracker_test.cpp
index 49f6aa3bf0c..eb66635ce07 100644
--- a/be/test/runtime/memory/mem_tracker_test.cpp
+++ b/be/test/runtime/memory/mem_tracker_test.cpp
@@ -26,7 +26,7 @@
namespace doris {
TEST(MemTrackerTest, SingleTrackerNoLimit) {
- auto t = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL);
+ auto t = MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL,
"UT");
EXPECT_FALSE(t->has_limit());
t->consume(10);
EXPECT_EQ(t->consumption(), 10);
diff --git a/be/test/testutil/run_all_tests.cpp
b/be/test/testutil/run_all_tests.cpp
index a0fc174aeda..5207279a291 100644
--- a/be/test/testutil/run_all_tests.cpp
+++ b/be/test/testutil/run_all_tests.cpp
@@ -55,6 +55,8 @@ int main(int argc, char** argv) {
"BE-UT");
doris::thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(test_tracker);
doris::ExecEnv::GetInstance()->set_cache_manager(doris::CacheManager::create_global_instance());
+ doris::ExecEnv::GetInstance()->set_process_profile(
+ doris::ProcessProfile::create_global_instance());
doris::ExecEnv::GetInstance()->set_dummy_lru_cache(std::make_shared<doris::DummyLRUCache>());
doris::ExecEnv::GetInstance()->set_storage_page_cache(
doris::StoragePageCache::create_global_cache(1 << 30, 10, 0));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]