This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch dev-1.1.2
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.2 by this push:
new 91a9aafc01 [dev-1.1.2](cherry-pick ) Optimize the return msg of
process memory limit exceed #12101
91a9aafc01 is described below
commit 91a9aafc01fbb714a98a4dfa569d9c4c057b4b99
Author: Xinyi Zou <[email protected]>
AuthorDate: Fri Aug 26 15:54:32 2022 +0800
[dev-1.1.2](cherry-pick ) Optimize the return msg of process memory limit
exceed #12101
---
be/src/common/config.h | 4 +-
be/src/runtime/bufferpool/buffer_allocator.cc | 2 +-
be/src/runtime/mem_pool.cpp | 1 +
be/src/runtime/memory/chunk_allocator.cpp | 14 ++++-
be/src/runtime/memory/mem_tracker_limiter.cpp | 81 +++++++++++++++++++--------
be/src/runtime/memory/mem_tracker_limiter.h | 32 ++++++-----
6 files changed, 94 insertions(+), 40 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 9533beff65..aa0b55bc0b 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -402,6 +402,8 @@ CONF_Int32(min_buffer_size, "1024"); // 1024, The minimum
read buffer size (in b
// With 1024B through 8MB buffers, this is up to ~2GB of buffers.
CONF_Int32(max_free_io_buffers, "128");
+// Whether to disable the memory cache pool,
+// including MemPool, ChunkAllocator, BufferPool, DiskIO free buffer.
CONF_Bool(disable_mem_pools, "false");
// Whether to allocate chunk using mmap. If you enable this, you'd better to
@@ -417,7 +419,7 @@ CONF_Bool(use_mmap_allocate_chunk, "false");
// must larger than 0. and if larger than physical memory size, it will be set
to physical memory size.
// increase this variable can improve performance,
// but will acquire more free memory which can not be used by other modules.
-CONF_mString(chunk_reserved_bytes_limit, "20%");
+CONF_mString(chunk_reserved_bytes_limit, "2147483648");
// 1024, The minimum chunk allocator size (in bytes)
CONF_Int32(min_chunk_reserved_bytes, "1024");
diff --git a/be/src/runtime/bufferpool/buffer_allocator.cc
b/be/src/runtime/bufferpool/buffer_allocator.cc
index a3bbe4c6c2..55a4de1076 100644
--- a/be/src/runtime/bufferpool/buffer_allocator.cc
+++ b/be/src/runtime/bufferpool/buffer_allocator.cc
@@ -238,7 +238,7 @@ Status
BufferPool::BufferAllocator::AllocateInternal(int64_t len, BufferHandle*
}
if (UNLIKELY(len > system_bytes_limit_)) {
err_stream << "Tried to allocate buffer of " << len << " bytes"
- << " > buffer pool limit of " << MAX_BUFFER_BYTES << "
bytes";
+ << " > buffer pool limit of " << system_bytes_limit_ << "
bytes";
return Status::InternalError(err_stream.str());
}
diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp
index 8518347c18..f108f2266b 100644
--- a/be/src/runtime/mem_pool.cpp
+++ b/be/src/runtime/mem_pool.cpp
@@ -124,6 +124,7 @@ bool MemPool::find_chunk(size_t min_size, bool
check_limits) {
if (config::disable_mem_pools) {
// Disable pooling by sizing the chunk to fit only this allocation.
// Make sure the alignment guarantees are respected.
+ // This will generate too many small chunks.
chunk_size = std::max<size_t>(min_size, alignof(max_align_t));
} else {
DCHECK_GE(next_chunk_size_, INITIAL_CHUNK_SIZE);
diff --git a/be/src/runtime/memory/chunk_allocator.cpp
b/be/src/runtime/memory/chunk_allocator.cpp
index 9d5cbfd592..6f22031006 100644
--- a/be/src/runtime/memory/chunk_allocator.cpp
+++ b/be/src/runtime/memory/chunk_allocator.cpp
@@ -33,6 +33,11 @@
namespace doris {
+// <= MIN_CHUNK_SIZE, A large number of small chunks will waste extra storage
and increase lock time.
+static constexpr size_t MIN_CHUNK_SIZE = 4096; // 4K
+// >= MAX_CHUNK_SIZE, Large chunks may not be used for a long time, wasting
memory.
+static constexpr size_t MAX_CHUNK_SIZE = 64 * (1ULL << 20); // 64M
+
ChunkAllocator* ChunkAllocator::_s_instance = nullptr;
DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(chunk_pool_local_core_alloc_count,
MetricUnit::NOUNIT);
@@ -176,14 +181,19 @@ bool ChunkAllocator::allocate(size_t size, Chunk* chunk) {
}
void ChunkAllocator::free(const Chunk& chunk) {
- if (chunk.core_id == -1) {
+ DCHECK(chunk.core_id != -1);
+ CHECK((chunk.size & (chunk.size - 1)) == 0);
+ if (config::disable_mem_pools) {
+ SystemAllocator::free(chunk.data, chunk.size);
return;
}
+
int64_t old_reserved_bytes = _reserved_bytes;
int64_t new_reserved_bytes = 0;
do {
new_reserved_bytes = old_reserved_bytes + chunk.size;
- if (new_reserved_bytes > _reserve_bytes_limit) {
+ if (chunk.size <= MIN_CHUNK_SIZE || chunk.size >= MAX_CHUNK_SIZE ||
+ new_reserved_bytes > _reserve_bytes_limit) {
int64_t cost_ns = 0;
{
SCOPED_RAW_TIMER(&cost_ns);
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 64f19873c2..4b268858b9 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -149,8 +149,9 @@ bool MemTrackerLimiter::gc_memory(int64_t max_consumption) {
Status MemTrackerLimiter::try_gc_memory(int64_t bytes) {
if (UNLIKELY(gc_memory(_limit - bytes))) {
return Status::MemoryLimitExceeded(fmt::format(
- "need_size={}, exceeded_tracker={}, limit={}, peak_used={},
current_used={}", bytes,
- label(), _limit, _consumption->value(),
_consumption->current_value()));
+ "failed_alloc_size={}Bytes, exceeded_tracker={}, limit={}B,
peak_used={}B, "
+ "current_used={}B",
+ bytes, label(), _limit, _consumption->value(),
_consumption->current_value()));
}
VLOG_NOTICE << "GC succeeded, TryConsume bytes=" << bytes
<< " consumption=" << _consumption->current_value() << "
limit=" << _limit;
@@ -223,22 +224,25 @@ std::string MemTrackerLimiter::log_usage(int
max_recursive_depth,
return join(usage_strings, "\n");
}
-Status MemTrackerLimiter::mem_limit_exceeded_log(const std::string& msg) {
- DCHECK(_limit != -1);
+Status MemTrackerLimiter::mem_limit_exceeded_construct(const std::string& msg)
{
std::string detail = fmt::format(
- "{}, backend={} memory used={}, free memory left={}. If is query,
can change the limit "
+ "{}, backend {} process memory used {}, process limit {}. If is
query, can "
+ "change the limit "
"by `set exec_mem_limit=xxx`, details mem usage see be.INFO.",
msg, BackendOptions::get_localhost(),
PrettyPrinter::print(PerfCounters::get_vm_rss(), TUnit::BYTES),
- PrettyPrinter::print(MemInfo::mem_limit() -
PerfCounters::get_vm_rss(), TUnit::BYTES));
- Status status = Status::MemoryLimitExceeded(detail);
+ PrettyPrinter::print(MemInfo::mem_limit(), TUnit::BYTES));
+ return Status::MemoryLimitExceeded(detail);
+}
+void MemTrackerLimiter::print_log_usage(const std::string& msg) {
// only print the tracker log_usage in be log.
+ std::string detail = msg;
if (_print_log_usage) {
if (_label == "Process") {
// Dumping the process MemTracker is expensive. Limiting the
recursive depth to two
// levels limits the level of detail to a one-line summary for
each query MemTracker.
- detail += "\n" +
ExecEnv::GetInstance()->new_process_mem_tracker()->log_usage(2);
+ detail += "\n" + log_usage(2);
} else {
detail += "\n" + log_usage();
}
@@ -246,37 +250,66 @@ Status MemTrackerLimiter::mem_limit_exceeded_log(const
std::string& msg) {
LOG(WARNING) << detail;
_print_log_usage = false;
}
- return status;
}
Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg,
int64_t failed_allocation_size) {
STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
DCHECK(!_limited_ancestors.empty());
- std::string detail = fmt::format("memory limit
exceeded:<consumed_tracker={}, ", _label);
+ std::string detail = fmt::format("Memory limit exceeded,
<consuming_tracker={}, ", _label);
if (failed_allocation_size != 0)
- detail += fmt::format("need_size={}, ",
+ detail += fmt::format("failed_alloc_size={}Bytes, ",
PrettyPrinter::print(failed_allocation_size,
TUnit::BYTES));
- MemTrackerLimiter* exceeded_tracker = this;
- int64_t free_size = INT_MAX;
+ MemTrackerLimiter* exceeded_tracker = nullptr;
+ MemTrackerLimiter* max_consumption_tracker = nullptr;
+ int64_t free_size = INT64_MAX;
+ // Find the tracker that exceed limit and has the least free.
for (const auto& tracker : _limited_ancestors) {
+ if (tracker->label() == "Process") continue;
int64_t max_consumption = tracker->peak_consumption() >
tracker->consumption()
? tracker->peak_consumption()
: tracker->consumption();
- if (tracker->has_limit() && tracker->limit() < max_consumption +
failed_allocation_size) {
+ if (tracker->limit() < max_consumption + failed_allocation_size) {
exceeded_tracker = tracker;
break;
}
- if (tracker->has_limit() && tracker->limit() - max_consumption <
free_size) {
+ if (tracker->limit() - max_consumption < free_size) {
free_size = tracker->limit() - max_consumption;
- exceeded_tracker = tracker;
+ max_consumption_tracker = tracker;
}
}
- detail += fmt::format(
- "exceeded_tracker={}, limit={}, peak_used={}, current_used={}>,
executing_msg:<{}>",
- exceeded_tracker->label(), exceeded_tracker->limit(),
- exceeded_tracker->peak_consumption(),
exceeded_tracker->consumption(), msg);
- return exceeded_tracker->mem_limit_exceeded_log(detail);
+
+ auto sys_exceed_st = check_sys_mem_info(failed_allocation_size);
+ MemTrackerLimiter* print_log_usage_tracker = nullptr;
+ if (exceeded_tracker != nullptr) {
+ detail += fmt::format(
+ "exceeded_tracker={}, limit={}B, peak_used={}B,
current_used={B}>, "
+ "executing_msg:<{}>",
+ exceeded_tracker->label(), exceeded_tracker->limit(),
+ exceeded_tracker->peak_consumption(),
exceeded_tracker->consumption(), msg);
+ print_log_usage_tracker = exceeded_tracker;
+ } else if (!sys_exceed_st) {
+ detail = fmt::format("Memory limit exceeded, {}, executing_msg:<{}>",
+ sys_exceed_st.get_error_msg(), msg);
+ } else if (max_consumption_tracker != nullptr) {
+ // must after check_sys_mem_info false
+ detail += fmt::format(
+ "max_consumption_tracker={}, limit={}B, peak_used={}B,
current_used={}B>, "
+ "executing_msg:<{}>",
+ max_consumption_tracker->label(),
max_consumption_tracker->limit(),
+ max_consumption_tracker->peak_consumption(),
max_consumption_tracker->consumption(),
+ msg);
+ print_log_usage_tracker = max_consumption_tracker;
+ } else {
+ // The limit of the current tracker and parents is less than 0, the
consume will not fail,
+ // and the current process memory has no excess limit.
+ detail += fmt::format("unknown exceed reason, executing_msg:<{}>",
msg);
+ print_log_usage_tracker =
ExecEnv::GetInstance()->new_process_mem_tracker().get();
+ }
+ auto st = MemTrackerLimiter::mem_limit_exceeded_construct(detail);
+ if (print_log_usage_tracker != nullptr)
+ print_log_usage_tracker->print_log_usage(st.get_error_msg());
+ return st;
}
Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg,
@@ -284,9 +317,11 @@ Status MemTrackerLimiter::mem_limit_exceeded(const
std::string& msg,
Status failed_try_consume_st) {
STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
std::string detail =
- fmt::format("memory limit exceeded:<consumed_tracker={}, {}>,
executing_msg:<{}>",
+ fmt::format("memory limit exceeded:<consuming_tracker={}, {}>,
executing_msg:<{}>",
_label, failed_try_consume_st.get_error_msg(), msg);
- return failed_tracker->mem_limit_exceeded_log(detail);
+ auto st = MemTrackerLimiter::mem_limit_exceeded_construct(detail);
+ failed_tracker->print_log_usage(st.get_error_msg());
+ return st;
}
Status MemTrackerLimiter::mem_limit_exceeded(RuntimeState* state, const
std::string& msg,
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h
b/be/src/runtime/memory/mem_tracker_limiter.h
index 885acffc11..1933a15fc5 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -70,9 +70,12 @@ public:
// of the process malloc.
// for fast, expect MemInfo::initialized() to be true.
if (PerfCounters::get_vm_rss() + bytes >= MemInfo::mem_limit()) {
- return Status::MemoryLimitExceeded(fmt::format(
- "{}: TryConsume failed, bytes={} process whole
consumption={} mem limit={}",
- _label, bytes, MemInfo::current_mem(),
MemInfo::mem_limit()));
+ auto st = Status::MemoryLimitExceeded(
+ fmt::format("Memory limit exceeded, process memory used {}
exceed limit {}, "
+ "consuming_tracker={}, failed_alloc_size={}",
+ PerfCounters::get_vm_rss(), MemInfo::mem_limit(), _label,
bytes));
+
ExecEnv::GetInstance()->new_process_mem_tracker()->print_log_usage(st.get_error_msg());
+ return st;
}
return Status::OK();
}
@@ -195,7 +198,8 @@ private:
const std::list<MemTrackerLimiter*>& trackers,
int64_t* logged_consumption);
- Status mem_limit_exceeded_log(const std::string& msg);
+ static Status mem_limit_exceeded_construct(const std::string& msg);
+ void print_log_usage(const std::string& msg);
private:
// Limit on memory consumption, in bytes. If limit_ == -1, there is no
consumption limit. Used in log_usage。
@@ -278,7 +282,9 @@ inline Status MemTrackerLimiter::try_consume(int64_t bytes)
{
// Walk the tracker tree top-down.
for (i = _all_ancestors.size() - 1; i >= 0; --i) {
MemTrackerLimiter* tracker = _all_ancestors[i];
- if (tracker->limit() < 0) {
+ // Process tracker does not participate in the process memory limit,
process tracker consumption is virtual memory,
+ // and there is a diff between the real physical memory value of the
process. It is replaced by check_sys_mem_info.
+ if (tracker->limit() < 0 || _label == "Process") {
tracker->_consumption->add(bytes); // No limit at this tracker.
} else {
// If TryConsume fails, we can try to GC, but we may need to try
several times if
@@ -307,14 +313,14 @@ inline Status MemTrackerLimiter::check_limit(int64_t
bytes) {
RETURN_IF_ERROR(check_sys_mem_info(bytes));
int i;
// Walk the tracker tree top-down.
- for (i = _all_ancestors.size() - 1; i >= 0; --i) {
- MemTrackerLimiter* tracker = _all_ancestors[i];
- if (tracker->limit() > 0) {
- while (true) {
- if (LIKELY(tracker->_consumption->current_value() + bytes <
tracker->limit()))
- break;
- RETURN_IF_ERROR(tracker->try_gc_memory(bytes));
- }
+ for (i = _limited_ancestors.size() - 1; i >= 0; --i) {
+ MemTrackerLimiter* tracker = _limited_ancestors[i];
+ // Process tracker does not participate in the process memory limit,
process tracker consumption is virtual memory,
+ // and there is a diff between the real physical memory value of the
process. It is replaced by check_sys_mem_info.
+ if (tracker->label() == "Process") continue;
+ while (true) {
+ if (LIKELY(tracker->_consumption->current_value() + bytes <
tracker->limit())) break;
+ RETURN_IF_ERROR(tracker->try_gc_memory(bytes));
}
}
return Status::OK();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]