This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d5fa66d9a3 [Enhancement] [Memory] Limit memory usage use process
actual physical memory (#10924)
d5fa66d9a3 is described below
commit d5fa66d9a3df6bd867743144ac542c20a87ec06c
Author: Xinyi Zou <[email protected]>
AuthorDate: Tue Jul 19 11:08:39 2022 +0800
[Enhancement] [Memory] Limit memory usage use process actual physical
memory (#10924)
---
be/src/http/action/compaction_action.h | 9 +--
be/src/olap/rowset/segment_v2/segment.cpp | 5 +-
be/src/runtime/exec_env_init.cpp | 1 -
be/src/runtime/mem_tracker.cpp | 1 +
be/src/runtime/mem_tracker.h | 7 ++-
be/src/runtime/tablets_channel.h | 2 +-
be/src/service/doris_main.cpp | 5 +-
be/src/util/CMakeLists.txt | 2 +-
be/src/util/mem_info.h | 2 +
be/src/util/perf_counters.cpp | 97 ++++++++++++++++++++++++++-----
be/src/util/perf_counters.h | 22 +++++++
11 files changed, 124 insertions(+), 29 deletions(-)
diff --git a/be/src/http/action/compaction_action.h
b/be/src/http/action/compaction_action.h
index 80b11bb436..943db1a85a 100644
--- a/be/src/http/action/compaction_action.h
+++ b/be/src/http/action/compaction_action.h
@@ -39,12 +39,7 @@ const std::string PARAM_COMPACTION_CUMULATIVE = "cumulative";
/// See compaction-action.md for details.
class CompactionAction : public HttpHandler {
public:
- CompactionAction(CompactionActionType type) : _type(type) {
- _compaction_mem_tracker =
- type == RUN_COMPACTION ? MemTracker::create_tracker(-1,
"ManualCompaction", nullptr,
-
MemTrackerLevel::VERBOSE)
- : nullptr;
- }
+ CompactionAction(CompactionActionType type) : _type(type) {}
virtual ~CompactionAction() {}
@@ -75,8 +70,6 @@ private:
static std::mutex _compaction_running_mutex;
/// whether there is manual compaction running
static bool _is_compaction_running;
- /// memory tracker
- std::shared_ptr<MemTracker> _compaction_mem_tracker;
};
} // end namespace doris
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp
b/be/src/olap/rowset/segment_v2/segment.cpp
index f3922752d8..f4e96833d3 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -48,9 +48,10 @@ Status Segment::open(io::FileSystem* fs, const std::string&
path, uint32_t segme
Segment::Segment(uint32_t segment_id, const TabletSchema* tablet_schema)
: _segment_id(segment_id), _tablet_schema(*tablet_schema) {
#ifndef BE_TEST
- _mem_tracker = StorageEngine::instance()->tablet_mem_tracker();
+ _mem_tracker = MemTracker::create_virtual_tracker(
+ -1, "Segment", StorageEngine::instance()->tablet_mem_tracker());
#else
- _mem_tracker = MemTracker::get_process_tracker();
+ _mem_tracker = MemTracker::create_virtual_tracker(-1, "Segment");
#endif
}
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 6d62e0a258..e82473b01f 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -183,7 +183,6 @@ Status ExecEnv::_init_mem_tracker() {
<< ". Using physical memory instead";
global_memory_limit_bytes = MemInfo::physical_mem();
}
- MemTracker::get_process_tracker()->set_limit(global_memory_limit_bytes);
_query_pool_mem_tracker = MemTracker::create_tracker(
-1, "QueryPool", MemTracker::get_process_tracker(),
MemTrackerLevel::OVERVIEW);
REGISTER_HOOK_METRIC(query_mem_consumption,
diff --git a/be/src/runtime/mem_tracker.cpp b/be/src/runtime/mem_tracker.cpp
index 71e89f8b1e..ffa7ae188b 100644
--- a/be/src/runtime/mem_tracker.cpp
+++ b/be/src/runtime/mem_tracker.cpp
@@ -39,6 +39,7 @@ const std::string MemTracker::COUNTER_NAME =
"PeakMemoryUsage";
// The ancestor for all trackers. Every tracker is visible from the process
down.
// All manually created trackers should specify the process tracker as the
parent.
+// Not limit total memory by process tracker, and it's just used to track
virtual memory of process.
static std::shared_ptr<MemTracker> process_tracker;
static MemTracker* raw_process_tracker;
static GoogleOnceType process_tracker_once = GOOGLE_ONCE_INIT;
diff --git a/be/src/runtime/mem_tracker.h b/be/src/runtime/mem_tracker.h
index e69f6c80ad..5b51ae30e2 100644
--- a/be/src/runtime/mem_tracker.h
+++ b/be/src/runtime/mem_tracker.h
@@ -29,6 +29,7 @@
#include "common/config.h"
#include "common/status.h"
#include "util/mem_info.h"
+#include "util/perf_counters.h"
#include "util/runtime_profile.h"
#include "util/spinlock.h"
@@ -112,7 +113,11 @@ public:
static std::shared_ptr<MemTracker> get_temporary_mem_tracker(const
std::string& label);
Status check_sys_mem_info(int64_t bytes) {
- if (MemInfo::initialized() && MemInfo::current_mem() + bytes >=
MemInfo::mem_limit()) {
+ // Limit process memory usage using the actual physical memory of the
process in `/proc/self/status`.
+ // This is independent of the consumption value of the mem tracker,
which counts the virtual memory
+ // of the process malloc.
+ // for fast, expect MemInfo::initialized() to be true.
+ if (PerfCounters::get_vm_rss() + bytes >= MemInfo::mem_limit()) {
return Status::MemoryLimitExceeded(
"{}: TryConsume failed, bytes={} process whole
consumption={} mem limit={}",
_label, bytes, MemInfo::current_mem(),
MemInfo::mem_limit());
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 5ad1b47027..69897544ca 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -171,7 +171,7 @@ Status TabletsChannel::_get_current_seq(int64_t& cur_seq,
const Request& request
template <typename TabletWriterAddRequest, typename TabletWriterAddResult>
Status TabletsChannel::add_batch(const TabletWriterAddRequest& request,
TabletWriterAddResult* response) {
- SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
+ SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
int64_t cur_seq = 0;
auto status = _get_current_seq(cur_seq, request);
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index c746f07b76..af65cab0e8 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -58,6 +58,7 @@
#include "util/debug_util.h"
#include "util/doris_metrics.h"
#include "util/logging.h"
+#include "util/perf_counters.h"
#include "util/telemetry/telemetry.h"
#include "util/thrift_rpc_helper.h"
#include "util/thrift_server.h"
@@ -478,11 +479,13 @@ int main(int argc, char** argv) {
!defined(USE_JEMALLOC)
doris::MemInfo::refresh_current_mem();
#endif
+ doris::PerfCounters::refresh_proc_status();
+
// TODO(zxy) 10s is too long to clear the expired task mem tracker.
// A query mem tracker is about 57 bytes, assuming 10000 qps, which
wastes about 55M of memory.
// It should be actively triggered at the end of query/load.
doris::ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->logout_task_mem_tracker();
- sleep(10);
+ sleep(1);
}
http_service.stop();
diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt
index ae31a7550e..b2523eeeab 100644
--- a/be/src/util/CMakeLists.txt
+++ b/be/src/util/CMakeLists.txt
@@ -47,7 +47,7 @@ set(UTIL_FILES
parse_util.cpp
path_builder.cpp
# TODO: not supported on RHEL 5
-# perf-counters.cpp
+ perf_counters.cpp
progress_updater.cpp
runtime_profile.cpp
static_asserts.cpp
diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h
index 7827271604..7ce77301de 100644
--- a/be/src/util/mem_info.h
+++ b/be/src/util/mem_info.h
@@ -46,6 +46,8 @@ public:
static inline size_t current_mem() { return _s_current_mem; }
+ // Tcmalloc property `generic.total_physical_bytes` records the total
length of the virtual memory
+ // obtained by the process malloc, not the physical memory actually used
by the process in the OS.
static inline void refresh_current_mem() {
MallocExtension::instance()->GetNumericProperty("generic.total_physical_bytes",
&_s_current_mem);
diff --git a/be/src/util/perf_counters.cpp b/be/src/util/perf_counters.cpp
index 30dd7ea763..2e9d51d56c 100644
--- a/be/src/util/perf_counters.cpp
+++ b/be/src/util/perf_counters.cpp
@@ -26,12 +26,17 @@
#include <string.h>
#include <sys/syscall.h>
+#include <boost/algorithm/string/trim.hpp>
#include <fstream>
#include <iomanip>
#include <iostream>
#include <sstream>
+#include "gutil/strings/substitute.h"
#include "util/debug_util.h"
+#include "util/pretty_printer.h"
+#include "util/string_parser.hpp"
+#include "util/string_util.h"
namespace doris {
@@ -39,6 +44,8 @@ namespace doris {
#define BUFFER_SIZE 256
#define PRETTY_PRINT_WIDTH 13
+static std::unordered_map<std::string, std::string> _process_state;
+
// This is the order of the counters in /proc/self/io
enum PERF_IO_IDX {
PROC_IO_READ = 0,
@@ -126,7 +133,7 @@ static bool init_event_attr(perf_event_attr* attr,
PerfCounters::Counter counter
return true;
}
-static string get_counter_name(PerfCounters::Counter counter) {
+static std::string get_counter_name(PerfCounters::Counter counter) {
switch (counter) {
case PerfCounters::PERF_COUNTER_SW_CPU_CLOCK:
return "CPUTime";
@@ -278,7 +285,7 @@ bool PerfCounters::init_proc_self_status_counter(Counter
counter) {
return true;
}
-bool PerfCounters::get_sys_counters(vector<int64_t>& buffer) {
+bool PerfCounters::get_sys_counters(std::vector<int64_t>& buffer) {
for (int i = 0; i < _counters.size(); i++) {
if (_counters[i].source == SYS_PERF_COUNTER) {
int num_bytes = read(_counters[i].fd, &buffer[i], COUNTER_SIZE);
@@ -306,7 +313,7 @@ bool PerfCounters::get_sys_counters(vector<int64_t>&
buffer) {
// read_bytes: 0
// write_bytes: 0
// cancelled_write_bytes: 0
-bool PerfCounters::get_proc_self_io_counters(vector<int64_t>& buffer) {
+bool PerfCounters::get_proc_self_io_counters(std::vector<int64_t>& buffer) {
std::ifstream file("/proc/self/io", std::ios::in);
std::string buf;
int64_t values[PROC_IO_LAST_COUNTER];
@@ -346,9 +353,9 @@ bool
PerfCounters::get_proc_self_io_counters(vector<int64_t>& buffer) {
return true;
}
-bool PerfCounters::get_proc_self_status_counters(vector<int64_t>& buffer) {
+bool PerfCounters::get_proc_self_status_counters(std::vector<int64_t>& buffer)
{
std::ifstream file("/proc/self/status", std::ios::in);
- string buf;
+ std::string buf;
while (file) {
getline(file, buf);
@@ -357,13 +364,13 @@ bool
PerfCounters::get_proc_self_status_counters(vector<int64_t>& buffer) {
if (_counters[i].source == PROC_SELF_STATUS) {
size_t field = buf.find(_counters[i].proc_status_field);
- if (field == string::npos) {
+ if (field == std::string::npos) {
continue;
}
size_t colon = field + _counters[i].proc_status_field.size() +
1;
buf = buf.substr(colon + 1);
- istringstream stream(buf);
+ std::istringstream stream(buf);
int64_t value;
stream >> value;
buffer[i] = value * 1024; // values in file are in kb
@@ -458,12 +465,12 @@ bool PerfCounters::add_counter(Counter counter) {
}
// Query all the counters right now and store the values in results
-void PerfCounters::snapshot(const string& name) {
+void PerfCounters::snapshot(const std::string& name) {
if (_counters.size() == 0) {
return;
}
- string fixed_name = name;
+ std::string fixed_name = name;
if (fixed_name.size() == 0) {
std::stringstream ss;
@@ -489,22 +496,22 @@ const std::vector<int64_t>* PerfCounters::counters(int
snapshot) const {
return &_snapshots[snapshot];
}
-void PerfCounters::pretty_print(ostream* s) const {
+void PerfCounters::pretty_print(std::ostream* s) const {
std::ostream& stream = *s;
- std::stream << setw(8) << "snapshot";
+ stream << std::setw(8) << "snapshot";
for (int i = 0; i < _counter_names.size(); ++i) {
- stream << setw(PRETTY_PRINT_WIDTH) << _counter_names[i];
+ stream << std::setw(PRETTY_PRINT_WIDTH) << _counter_names[i];
}
stream << std::endl;
for (int s = 0; s < _snapshots.size(); s++) {
- stream << setw(8) << _snapshot_names[s];
+ stream << std::setw(8) << _snapshot_names[s];
const std::vector<int64_t>& snapshot = _snapshots[s];
for (int i = 0; i < snapshot.size(); ++i) {
- stream << setw(PRETTY_PRINT_WIDTH)
+ stream << std::setw(PRETTY_PRINT_WIDTH)
<< PrettyPrinter::print(snapshot[i], _counters[i].type);
}
@@ -514,4 +521,66 @@ void PerfCounters::pretty_print(ostream* s) const {
stream << std::endl;
}
+// Refactor below
+
+int PerfCounters::parse_int(const string& state_key) {
+ auto it = _process_state.find(state_key);
+ if (it != _process_state.end()) return atoi(it->second.c_str());
+ return -1;
+}
+
+int64_t PerfCounters::parse_int64(const string& state_key) {
+ auto it = _process_state.find(state_key);
+ if (it != _process_state.end()) {
+ StringParser::ParseResult result;
+ int64_t state_value =
+ StringParser::string_to_int<int64_t>(it->second.data(),
it->second.size(), &result);
+ if (result == StringParser::PARSE_SUCCESS) return state_value;
+ }
+ return -1;
+}
+
+string PerfCounters::parse_string(const string& state_key) {
+ auto it = _process_state.find(state_key);
+ if (it != _process_state.end()) return it->second;
+ return string();
+}
+
+int64_t PerfCounters::parse_bytes(const string& state_key) {
+ auto it = _process_state.find(state_key);
+ if (it != _process_state.end()) {
+ vector<string> fields = split(it->second, " ");
+ // We expect state_value such as, e.g., '16129508', '16129508 kB',
'16129508 mB'
+ StringParser::ParseResult result;
+ int64_t state_value =
+ StringParser::string_to_int<int64_t>(fields[0].data(),
fields[0].size(), &result);
+ if (result == StringParser::PARSE_SUCCESS) {
+ if (fields.size() < 2) return state_value;
+ if (fields[1].compare("kB") == 0) return state_value * 1024L;
+ }
+ }
+ return -1;
+}
+
+void PerfCounters::refresh_proc_status() {
+ std::ifstream statusinfo("/proc/self/status", std::ios::in);
+ std::string line;
+ while (statusinfo.good() && !statusinfo.eof()) {
+ getline(statusinfo, line);
+ std::vector<std::string> fields = split(line, "\t");
+ if (fields.size() < 2) continue;
+ boost::algorithm::trim(fields[1]);
+ std::string key = fields[0].substr(0, fields[0].size() - 1);
+ _process_state[strings::Substitute("status/$0", key)] = fields[1];
+ }
+
+ if (statusinfo.is_open()) statusinfo.close();
+}
+
+void PerfCounters::get_proc_status(ProcStatus* out) {
+ out->vm_size = parse_bytes("status/VmSize");
+ out->vm_peak = parse_bytes("status/VmPeak");
+ out->vm_rss = parse_bytes("status/VmRSS");
+}
+
} // namespace doris
diff --git a/be/src/util/perf_counters.h b/be/src/util/perf_counters.h
index adad06c354..c2a4b1c4d9 100644
--- a/be/src/util/perf_counters.h
+++ b/be/src/util/perf_counters.h
@@ -41,6 +41,8 @@
// <do your work>
// counters.snapshot("After Work");
// counters.PrettyPrint(cout);
+//
+// TODO: Expect PerfCounters to be refactored to ProcessState.
namespace doris {
@@ -95,6 +97,26 @@ public:
PerfCounters();
~PerfCounters();
+ // Refactor
+
+ struct ProcStatus {
+ int64_t vm_size = 0;
+ int64_t vm_peak = 0;
+ int64_t vm_rss = 0;
+ };
+
+ static int parse_int(const std::string& state_key);
+ static int64_t parse_int64(const std::string& state_key);
+ static std::string parse_string(const std::string& state_key);
+ // Original data's unit is B or KB.
+ static int64_t parse_bytes(const std::string& state_key);
+
+ // Flush cached process status info from `/proc/self/status`.
+ static void refresh_proc_status();
+ static void get_proc_status(ProcStatus* out);
+ // Return the process actual physical memory in bytes.
+ static inline int64_t get_vm_rss() { return parse_bytes("status/VmRSS"); }
+
private:
// Copy constructor and assignment not allowed
PerfCounters(const PerfCounters&);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]