This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new a61030215e4 [branch-2.1](memory) Support make all memory snapshots
(#37705)
a61030215e4 is described below
commit a61030215e4b039fd6b5b544227d81ecd23d9bc7
Author: Xinyi Zou <[email protected]>
AuthorDate: Fri Jul 12 16:21:37 2024 +0800
[branch-2.1](memory) Support make all memory snapshots (#37705)
pick #36679
---
be/src/common/daemon.cpp | 8 ++--
be/src/http/default_path_handlers.cpp | 2 +
be/src/runtime/memory/mem_tracker.cpp | 16 +++++++-
be/src/runtime/memory/mem_tracker.h | 35 +++++++++++++++++
be/src/runtime/memory/mem_tracker_limiter.cpp | 45 +++++++++++++++-------
be/src/runtime/memory/mem_tracker_limiter.h | 37 ++----------------
...emory_arbitrator.cpp => memory_reclamation.cpp} | 13 ++++---
.../{memory_arbitrator.h => memory_reclamation.h} | 2 +-
be/src/vec/sink/writer/vtablet_writer.cpp | 6 +--
9 files changed, 102 insertions(+), 62 deletions(-)
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 77d0fdaf0e5..d54189bce23 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -50,7 +50,7 @@
#include "runtime/memory/global_memory_arbitrator.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/memory/mem_tracker_limiter.h"
-#include "runtime/memory/memory_arbitrator.h"
+#include "runtime/memory/memory_reclamation.h"
#include "runtime/runtime_query_statistics_mgr.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "util/cpu_info.h"
@@ -234,7 +234,7 @@ void Daemon::memory_gc_thread() {
auto process_memory_usage =
doris::GlobalMemoryArbitrator::process_memory_usage();
// GC excess memory for resource groups that not enable overcommit
- auto tg_free_mem =
doris::MemoryArbitrator::tg_disable_overcommit_group_gc();
+ auto tg_free_mem =
doris::MemoryReclamation::tg_disable_overcommit_group_gc();
sys_mem_available += tg_free_mem;
process_memory_usage -= tg_free_mem;
@@ -248,7 +248,7 @@ void Daemon::memory_gc_thread() {
memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms;
LOG(INFO) << fmt::format("[MemoryGC] start full GC, {}.",
mem_info);
doris::MemTrackerLimiter::print_log_process_usage();
- if (doris::MemoryArbitrator::process_full_gc(std::move(mem_info)))
{
+ if
(doris::MemoryReclamation::process_full_gc(std::move(mem_info))) {
// If there is not enough memory to be gc, the process memory
usage will not be printed in the next continuous gc.
doris::MemTrackerLimiter::enable_print_log_process_usage();
}
@@ -261,7 +261,7 @@ void Daemon::memory_gc_thread() {
memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms;
LOG(INFO) << fmt::format("[MemoryGC] start minor GC, {}.",
mem_info);
doris::MemTrackerLimiter::print_log_process_usage();
- if
(doris::MemoryArbitrator::process_minor_gc(std::move(mem_info))) {
+ if
(doris::MemoryReclamation::process_minor_gc(std::move(mem_info))) {
doris::MemTrackerLimiter::enable_print_log_process_usage();
}
} else {
diff --git a/be/src/http/default_path_handlers.cpp
b/be/src/http/default_path_handlers.cpp
index 5c697539fbc..8d1a14ffda3 100644
--- a/be/src/http/default_path_handlers.cpp
+++ b/be/src/http/default_path_handlers.cpp
@@ -158,6 +158,8 @@ void mem_tracker_handler(const WebPageHandler::ArgumentMap&
args, std::stringstr
MemTrackerLimiter::make_type_snapshots(&snapshots,
MemTrackerLimiter::Type::OTHER);
} else if (iter->second == "reserved_memory") {
GlobalMemoryArbitrator::make_reserved_memory_snapshots(&snapshots);
+ } else if (iter->second == "all") {
+ MemTrackerLimiter::make_all_memory_state_snapshots(&snapshots);
}
} else {
(*output) << "<h4>*Notice:</h4>\n";
diff --git a/be/src/runtime/memory/mem_tracker.cpp
b/be/src/runtime/memory/mem_tracker.cpp
index 27b16c76f2c..f5a3853f79f 100644
--- a/be/src/runtime/memory/mem_tracker.cpp
+++ b/be/src/runtime/memory/mem_tracker.cpp
@@ -45,9 +45,11 @@ MemTracker::MemTracker(const std::string& label,
MemTrackerLimiter* parent) : _l
void MemTracker::bind_parent(MemTrackerLimiter* parent) {
if (parent) {
+ _type = parent->type();
_parent_label = parent->label();
_parent_group_num = parent->group_num();
} else {
+ _type = thread_context()->thread_mem_tracker()->type();
_parent_label = thread_context()->thread_mem_tracker()->label();
_parent_group_num =
thread_context()->thread_mem_tracker()->group_num();
}
@@ -72,6 +74,7 @@ MemTracker::~MemTracker() {
MemTracker::Snapshot MemTracker::make_snapshot() const {
Snapshot snapshot;
+ snapshot.type = type_string(_type);
snapshot.label = _label;
snapshot.parent_label = _parent_label;
snapshot.limit = -1;
@@ -83,13 +86,24 @@ MemTracker::Snapshot MemTracker::make_snapshot() const {
void MemTracker::make_group_snapshot(std::vector<MemTracker::Snapshot>*
snapshots,
int64_t group_num, std::string
parent_label) {
std::lock_guard<std::mutex> l(mem_tracker_pool[group_num].group_lock);
- for (auto tracker : mem_tracker_pool[group_num].trackers) {
+ for (auto* tracker : mem_tracker_pool[group_num].trackers) {
if (tracker->parent_label() == parent_label &&
tracker->peak_consumption() != 0) {
snapshots->push_back(tracker->make_snapshot());
}
}
}
+void MemTracker::make_all_trackers_snapshots(std::vector<Snapshot>* snapshots)
{
+ for (auto& i : mem_tracker_pool) {
+ std::lock_guard<std::mutex> l(i.group_lock);
+ for (auto* tracker : i.trackers) {
+ if (tracker->peak_consumption() != 0) {
+ snapshots->push_back(tracker->make_snapshot());
+ }
+ }
+ }
+}
+
std::string MemTracker::log_usage(MemTracker::Snapshot snapshot) {
return fmt::format("MemTracker Label={}, Parent Label={}, Used={}({} B),
Peak={}({} B)",
snapshot.label, snapshot.parent_label,
print_bytes(snapshot.cur_consumption),
diff --git a/be/src/runtime/memory/mem_tracker.h
b/be/src/runtime/memory/mem_tracker.h
index d308d201901..8a574398e0e 100644
--- a/be/src/runtime/memory/mem_tracker.h
+++ b/be/src/runtime/memory/mem_tracker.h
@@ -63,6 +63,36 @@ public:
std::mutex group_lock;
};
+ enum class Type {
+ GLOBAL = 0, // Life cycle is the same as the process, e.g.
Cache and default Orphan
+ QUERY = 1, // Count the memory consumption of all Query tasks.
+ LOAD = 2, // Count the memory consumption of all Load tasks.
+ COMPACTION = 3, // Count the memory consumption of all Base and
Cumulative tasks.
+ SCHEMA_CHANGE = 4, // Count the memory consumption of all SchemaChange
tasks.
+ OTHER = 5
+ };
+
+ static std::string type_string(Type type) {
+ switch (type) {
+ case Type::GLOBAL:
+ return "global";
+ case Type::QUERY:
+ return "query";
+ case Type::LOAD:
+ return "load";
+ case Type::COMPACTION:
+ return "compaction";
+ case Type::SCHEMA_CHANGE:
+ return "schema_change";
+ case Type::OTHER:
+ return "other";
+ default:
+ LOG(FATAL) << "not match type of mem tracker limiter :" <<
static_cast<int>(type);
+ }
+ LOG(FATAL) << "__builtin_unreachable";
+ __builtin_unreachable();
+ }
+
// A counter that keeps track of the current and peak value seen.
// Relaxed ordering, not accurate in real time.
class MemCounter {
@@ -127,6 +157,7 @@ public:
}
public:
+ Type type() const { return _type; }
const std::string& label() const { return _label; }
const std::string& parent_label() const { return _parent_label; }
const std::string& set_parent_label() const { return _parent_label; }
@@ -160,6 +191,7 @@ public:
// Specify group_num from mem_tracker_pool to generate snapshot.
static void make_group_snapshot(std::vector<Snapshot>* snapshots, int64_t
group_num,
std::string parent_label);
+ static void make_all_trackers_snapshots(std::vector<Snapshot>* snapshots);
static std::string log_usage(MemTracker::Snapshot snapshot);
virtual std::string debug_string() {
@@ -173,6 +205,8 @@ public:
protected:
void bind_parent(MemTrackerLimiter* parent);
+ Type _type;
+
// label used in the make snapshot, not guaranteed unique.
std::string _label;
@@ -180,6 +214,7 @@ protected:
// Tracker is located in group num in mem_tracker_pool
int64_t _parent_group_num = 0;
+
// Use _parent_label to correlate with parent limiter tracker.
std::string _parent_label = "-";
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 489d59ab1b1..e79ca1bfb3a 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -181,8 +181,8 @@ void
MemTrackerLimiter::make_process_snapshots(std::vector<MemTracker::Snapshot>
int64_t all_tracker_mem_sum = 0;
Snapshot snapshot;
for (auto it : MemTrackerLimiter::TypeMemSum) {
- snapshot.type = type_string(it.first);
- snapshot.label = "";
+ snapshot.type = "overview";
+ snapshot.label = type_string(it.first);
snapshot.limit = -1;
snapshot.cur_consumption = it.second->current_value();
snapshot.peak_consumption = it.second->peak_value();
@@ -190,41 +190,41 @@ void
MemTrackerLimiter::make_process_snapshots(std::vector<MemTracker::Snapshot>
all_tracker_mem_sum += it.second->current_value();
}
- snapshot.type = "tc/jemalloc cache";
- snapshot.label = "";
+ snapshot.type = "overview";
+ snapshot.label = "tc/jemalloc cache";
snapshot.limit = -1;
snapshot.cur_consumption = MemInfo::allocator_cache_mem();
snapshot.peak_consumption = -1;
(*snapshots).emplace_back(snapshot);
all_tracker_mem_sum += MemInfo::allocator_cache_mem();
- snapshot.type = "sum of all trackers"; // is virtual memory
- snapshot.label = "";
+ snapshot.type = "overview";
+ snapshot.label = "sum of all trackers"; // is virtual memory
snapshot.limit = -1;
snapshot.cur_consumption = all_tracker_mem_sum;
snapshot.peak_consumption = -1;
(*snapshots).emplace_back(snapshot);
+ snapshot.type = "overview";
#ifdef ADDRESS_SANITIZER
- snapshot.type = "[ASAN]process resident memory"; // from /proc VmRSS VmHWM
+ snapshot.label = "[ASAN]process resident memory"; // from /proc VmRSS VmHWM
#else
- snapshot.type = "process resident memory"; // from /proc VmRSS VmHWM
+ snapshot.label = "process resident memory"; // from /proc VmRSS VmHWM
#endif
- snapshot.label = "";
snapshot.limit = -1;
snapshot.cur_consumption = PerfCounters::get_vm_rss();
snapshot.peak_consumption = PerfCounters::get_vm_hwm();
(*snapshots).emplace_back(snapshot);
- snapshot.type = "reserved memory";
- snapshot.label = "";
+ snapshot.type = "overview";
+ snapshot.label = "reserved memory";
snapshot.limit = -1;
snapshot.cur_consumption =
GlobalMemoryArbitrator::process_reserved_memory();
snapshot.peak_consumption = -1;
(*snapshots).emplace_back(snapshot);
- snapshot.type = "process virtual memory"; // from /proc VmSize VmPeak
- snapshot.label = "";
+ snapshot.type = "overview";
+ snapshot.label = "process virtual memory"; // from /proc VmSize VmPeak
snapshot.limit = -1;
snapshot.cur_consumption = PerfCounters::get_vm_size();
snapshot.peak_consumption = PerfCounters::get_vm_peak();
@@ -281,6 +281,25 @@ void
MemTrackerLimiter::make_top_consumption_snapshots(std::vector<MemTracker::S
}
}
+void
MemTrackerLimiter::make_all_trackers_snapshots(std::vector<MemTracker::Snapshot>*
snapshots) {
+ for (auto& i : ExecEnv::GetInstance()->mem_tracker_limiter_pool) {
+ std::lock_guard<std::mutex> l(i.group_lock);
+ for (auto trackerWptr : i.trackers) {
+ auto tracker = trackerWptr.lock();
+ if (tracker != nullptr) {
+ (*snapshots).emplace_back(tracker->make_snapshot());
+ }
+ }
+ }
+}
+
+void MemTrackerLimiter::make_all_memory_state_snapshots(
+ std::vector<MemTracker::Snapshot>* snapshots) {
+ make_process_snapshots(snapshots);
+ make_all_trackers_snapshots(snapshots);
+ MemTracker::make_all_trackers_snapshots(snapshots);
+}
+
std::string MemTrackerLimiter::log_usage(MemTracker::Snapshot snapshot) {
return fmt::format(
"MemTrackerLimiter Label={}, Type={}, Limit={}({} B), Used={}({}
B), Peak={}({} B)",
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h
b/be/src/runtime/memory/mem_tracker_limiter.h
index 2c4221373be..e6cf8410c30 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -66,15 +66,6 @@ struct TrackerLimiterGroup {
// will be recorded on this Query, otherwise it will be recorded in Orphan
Tracker by default.
class MemTrackerLimiter final : public MemTracker {
public:
- enum class Type {
- GLOBAL = 0, // Life cycle is the same as the process, e.g.
Cache and default Orphan
- QUERY = 1, // Count the memory consumption of all Query tasks.
- LOAD = 2, // Count the memory consumption of all Load tasks.
- COMPACTION = 3, // Count the memory consumption of all Base and
Cumulative tasks.
- SCHEMA_CHANGE = 4, // Count the memory consumption of all SchemaChange
tasks.
- OTHER = 5
- };
-
// TODO There are more and more GC codes and there should be a separate
manager class.
enum class GCType { PROCESS = 0, WORK_LOAD_GROUP = 1 };
@@ -95,27 +86,6 @@ public:
~MemTrackerLimiter() override;
- static std::string type_string(Type type) {
- switch (type) {
- case Type::GLOBAL:
- return "global";
- case Type::QUERY:
- return "query";
- case Type::LOAD:
- return "load";
- case Type::COMPACTION:
- return "compaction";
- case Type::SCHEMA_CHANGE:
- return "schema_change";
- case Type::OTHER:
- return "other";
- default:
- LOG(FATAL) << "not match type of mem tracker limiter :" <<
static_cast<int>(type);
- }
- LOG(FATAL) << "__builtin_unreachable";
- __builtin_unreachable();
- }
-
static std::string gc_type_string(GCType type) {
switch (type) {
case GCType::PROCESS:
@@ -130,7 +100,6 @@ public:
}
void set_consumption() { LOG(FATAL) << "MemTrackerLimiter set_consumption
not supported"; }
- Type type() const { return _type; }
int64_t group_num() const { return _group_num; }
bool has_limit() const { return _limit >= 0; }
int64_t limit() const { return _limit; }
@@ -177,11 +146,13 @@ public:
// Returns a list of all the valid tracker snapshots.
static void make_process_snapshots(std::vector<MemTracker::Snapshot>*
snapshots);
static void make_type_snapshots(std::vector<MemTracker::Snapshot>*
snapshots, Type type);
+ static void make_all_trackers_snapshots(std::vector<MemTracker::Snapshot>*
snapshots);
+ static void
make_all_memory_state_snapshots(std::vector<MemTracker::Snapshot>* snapshots);
static void
make_top_consumption_snapshots(std::vector<MemTracker::Snapshot>* snapshots,
int top_num);
static std::string log_usage(MemTracker::Snapshot snapshot);
- std::string log_usage() { return log_usage(make_snapshot()); }
+ std::string log_usage() const { return log_usage(make_snapshot()); }
static std::string type_log_usage(MemTracker::Snapshot snapshot);
static std::string type_detail_usage(const std::string& msg, Type type);
void print_log_usage(const std::string& msg);
@@ -258,8 +229,6 @@ private:
int64_t add_untracked_mem(int64_t bytes);
private:
- Type _type;
-
// Limit on memory consumption, in bytes.
int64_t _limit;
diff --git a/be/src/runtime/memory/memory_arbitrator.cpp
b/be/src/runtime/memory/memory_reclamation.cpp
similarity index 95%
rename from be/src/runtime/memory/memory_arbitrator.cpp
rename to be/src/runtime/memory/memory_reclamation.cpp
index a99f358526a..536c4658c8c 100644
--- a/be/src/runtime/memory/memory_arbitrator.cpp
+++ b/be/src/runtime/memory/memory_reclamation.cpp
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#include "runtime/memory/memory_arbitrator.h"
+#include "runtime/memory/memory_reclamation.h"
#include "runtime/memory/cache_manager.h"
#include "runtime/workload_group/workload_group.h"
@@ -30,7 +30,7 @@ namespace doris {
// step2: free resource groups memory that enable overcommit
// step3: free global top overcommit query, if enable query memory overcommit
// TODO Now, the meaning is different from java minor gc + full gc, more like
small gc + large gc.
-bool MemoryArbitrator::process_minor_gc(std::string mem_info) {
+bool MemoryReclamation::process_minor_gc(std::string mem_info) {
MonotonicStopWatch watch;
watch.start();
int64_t freed_mem = 0;
@@ -81,7 +81,7 @@ bool MemoryArbitrator::process_minor_gc(std::string mem_info)
{
// step3: free global top memory query
// step4: free top overcommit load, load retries are more expensive, So cancel
at the end.
// step5: free top memory load
-bool MemoryArbitrator::process_full_gc(std::string mem_info) {
+bool MemoryReclamation::process_full_gc(std::string mem_info) {
MonotonicStopWatch watch;
watch.start();
int64_t freed_mem = 0;
@@ -142,7 +142,7 @@ bool MemoryArbitrator::process_full_gc(std::string
mem_info) {
return freed_mem > MemInfo::process_full_gc_size();
}
-int64_t MemoryArbitrator::tg_disable_overcommit_group_gc() {
+int64_t MemoryReclamation::tg_disable_overcommit_group_gc() {
MonotonicStopWatch watch;
watch.start();
std::vector<WorkloadGroupPtr> task_groups;
@@ -196,8 +196,9 @@ int64_t MemoryArbitrator::tg_disable_overcommit_group_gc() {
return total_free_memory;
}
-int64_t MemoryArbitrator::tg_enable_overcommit_group_gc(int64_t
request_free_memory,
- RuntimeProfile*
profile, bool is_minor_gc) {
+int64_t MemoryReclamation::tg_enable_overcommit_group_gc(int64_t
request_free_memory,
+ RuntimeProfile*
profile,
+ bool is_minor_gc) {
MonotonicStopWatch watch;
watch.start();
std::vector<WorkloadGroupPtr> task_groups;
diff --git a/be/src/runtime/memory/memory_arbitrator.h
b/be/src/runtime/memory/memory_reclamation.h
similarity index 98%
rename from be/src/runtime/memory/memory_arbitrator.h
rename to be/src/runtime/memory/memory_reclamation.h
index 2a936b8ba05..08532671e95 100644
--- a/be/src/runtime/memory/memory_arbitrator.h
+++ b/be/src/runtime/memory/memory_reclamation.h
@@ -21,7 +21,7 @@
namespace doris {
-class MemoryArbitrator {
+class MemoryReclamation {
public:
static bool process_minor_gc(
std::string mem_info =
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 60bd4eafa8a..487bd60b838 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -62,7 +62,7 @@
#include "exec/tablet_info.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
-#include "runtime/memory/memory_arbitrator.h"
+#include "runtime/memory/memory_reclamation.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "service/backend_options.h"
@@ -555,7 +555,7 @@ Status VNodeChannel::add_block(vectorized::Block* block,
const Payload* payload)
int VNodeChannel::try_send_and_fetch_status(RuntimeState* state,
std::unique_ptr<ThreadPoolToken>&
thread_pool_token) {
DBUG_EXECUTE_IF("VNodeChannel.try_send_and_fetch_status_full_gc",
- { MemoryArbitrator::process_full_gc(); });
+ { MemoryReclamation::process_full_gc(); });
if (_cancelled || _send_finished) { // not run
return 0;
@@ -876,7 +876,7 @@ void VNodeChannel::cancel(const std::string& cancel_msg) {
}
Status VNodeChannel::close_wait(RuntimeState* state) {
- DBUG_EXECUTE_IF("VNodeChannel.close_wait_full_gc", {
MemoryArbitrator::process_full_gc(); });
+ DBUG_EXECUTE_IF("VNodeChannel.close_wait_full_gc", {
MemoryReclamation::process_full_gc(); });
SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
// set _is_closed to true finally
Defer set_closed {[&]() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]