This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ecbf87d77b [bugfix](memtracker)fix exceed memory limit log (#11485)
ecbf87d77b is described below
commit ecbf87d77b36e345d9cf6deb6a776f2344de3bdc
Author: Xinyi Zou <[email protected]>
AuthorDate: Thu Aug 4 10:22:20 2022 +0800
[bugfix](memtracker)fix exceed memory limit log (#11485)
---
be/src/exec/cross_join_node.cpp | 1 -
be/src/exec/es/es_scroll_parser.cpp | 11 ++---
be/src/exec/except_node.cpp | 1 -
be/src/exec/hash_join_node.cpp | 2 -
be/src/exec/intersect_node.cpp | 1 -
be/src/exec/olap_scanner.cpp | 5 +-
be/src/exec/partitioned_aggregation_node.cc | 10 ++--
be/src/exec/partitioned_hash_table.cc | 7 ++-
be/src/exec/set_operation_node.cpp | 1 -
be/src/exprs/anyval_util.cpp | 6 +--
be/src/exprs/expr_context.cpp | 6 +--
be/src/runtime/mem_pool.h | 43 +++++++----------
be/src/runtime/memory/mem_tracker_limiter.cpp | 59 +++++++++++-------------
be/src/runtime/memory/mem_tracker_limiter.h | 16 +++----
be/src/runtime/memory/mem_tracker_task_pool.cpp | 8 ++--
be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 31 ++++++-------
be/src/runtime/memory/thread_mem_tracker_mgr.h | 41 +++-------------
be/src/runtime/plan_fragment_executor.cpp | 1 +
be/src/runtime/runtime_state.cpp | 4 +-
be/src/runtime/thread_context.cpp | 18 --------
be/src/runtime/thread_context.h | 40 +++++-----------
be/src/vec/exec/join/vhash_join_node.cpp | 1 -
be/src/vec/exec/vaggregation_node.cpp | 2 -
be/src/vec/exec/vcross_join_node.cpp | 1 -
be/src/vec/exec/vset_operation_node.cpp | 1 -
25 files changed, 106 insertions(+), 211 deletions(-)
diff --git a/be/src/exec/cross_join_node.cpp b/be/src/exec/cross_join_node.cpp
index c80560bf82..fe748904f0 100644
--- a/be/src/exec/cross_join_node.cpp
+++ b/be/src/exec/cross_join_node.cpp
@@ -52,7 +52,6 @@ Status CrossJoinNode::close(RuntimeState* state) {
Status CrossJoinNode::construct_build_side(RuntimeState* state) {
// Do a full scan of child(1) and store all build row batches.
RETURN_IF_ERROR(child(1)->open(state));
- SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Cross join, while getting next from
child 1");
while (true) {
RowBatch* batch =
diff --git a/be/src/exec/es/es_scroll_parser.cpp
b/be/src/exec/es/es_scroll_parser.cpp
index a95af7bb51..7170057ac2 100644
--- a/be/src/exec/es/es_scroll_parser.cpp
+++ b/be/src/exec/es/es_scroll_parser.cpp
@@ -348,12 +348,11 @@ Status ScrollParser::fill_tuple(const TupleDescriptor*
tuple_desc, Tuple* tuple,
// obj[FIELD_ID] must not be nullptr
std::string _id = obj[FIELD_ID].GetString();
size_t len = _id.length();
- Status rst;
- char* buffer =
reinterpret_cast<char*>(tuple_pool->try_allocate_unaligned(len, &rst));
+ char* buffer =
reinterpret_cast<char*>(tuple_pool->try_allocate_unaligned(len));
if (UNLIKELY(buffer == nullptr)) {
std::string details =
strings::Substitute(ERROR_MEM_LIMIT_EXCEEDED,
"MaterializeNextRow", len, "string slot");
- RETURN_LIMIT_EXCEEDED(nullptr, details, len, rst);
+ RETURN_LIMIT_EXCEEDED(nullptr, details, len);
}
memcpy(buffer, _id.data(), len);
reinterpret_cast<StringValue*>(slot)->ptr = buffer;
@@ -407,13 +406,11 @@ Status ScrollParser::fill_tuple(const TupleDescriptor*
tuple_desc, Tuple* tuple,
}
}
size_t val_size = val.length();
- Status rst;
- char* buffer =
-
reinterpret_cast<char*>(tuple_pool->try_allocate_unaligned(val_size, &rst));
+ char* buffer =
reinterpret_cast<char*>(tuple_pool->try_allocate_unaligned(val_size));
if (UNLIKELY(buffer == nullptr)) {
std::string details = strings::Substitute(
ERROR_MEM_LIMIT_EXCEEDED, "MaterializeNextRow",
val_size, "string slot");
- RETURN_LIMIT_EXCEEDED(nullptr, details, val_size, rst);
+ RETURN_LIMIT_EXCEEDED(nullptr, details, val_size);
}
memcpy(buffer, val.data(), val_size);
reinterpret_cast<StringValue*>(slot)->ptr = buffer;
diff --git a/be/src/exec/except_node.cpp b/be/src/exec/except_node.cpp
index 8084fb47f4..8ad7ce9044 100644
--- a/be/src/exec/except_node.cpp
+++ b/be/src/exec/except_node.cpp
@@ -40,7 +40,6 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState*
state) {
Status ExceptNode::open(RuntimeState* state) {
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
- SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Except Node, while probing the hash
table.");
RETURN_IF_ERROR(SetOperationNode::open(state));
// if a table is empty, the result must be empty
if (_hash_tbl->size() == 0) {
diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp
index c642335924..b328c4b157 100644
--- a/be/src/exec/hash_join_node.cpp
+++ b/be/src/exec/hash_join_node.cpp
@@ -184,7 +184,6 @@ Status HashJoinNode::construct_hash_table(RuntimeState*
state) {
// The hash join node needs to keep in memory all build tuples, including
the tuple
// row ptrs. The row ptrs are copied into the hash table's internal
structure so they
// don't need to be stored in the _build_pool.
- SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Hash join, while constructing the hash
table.");
RowBatch build_batch(child(1)->row_desc(), state->batch_size());
RETURN_IF_ERROR(child(1)->open(state));
@@ -301,7 +300,6 @@ Status HashJoinNode::get_next(RuntimeState* state,
RowBatch* out_batch, bool* eo
// In most cases, no additional memory overhead will be applied for at
this stage,
// but if the expression calculation in this node needs to apply for
additional memory,
// it may cause the memory to exceed the limit.
- SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Hash join, while execute get_next.");
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
diff --git a/be/src/exec/intersect_node.cpp b/be/src/exec/intersect_node.cpp
index c897945fed..9bf02cedd4 100644
--- a/be/src/exec/intersect_node.cpp
+++ b/be/src/exec/intersect_node.cpp
@@ -44,7 +44,6 @@ Status IntersectNode::init(const TPlanNode& tnode,
RuntimeState* state) {
// repeat [2] this for all the rest child
Status IntersectNode::open(RuntimeState* state) {
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
- SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Intersect Node, while probing the hash
table.");
RETURN_IF_ERROR(SetOperationNode::open(state));
// if a table is empty, the result must be empty
if (_hash_tbl->size() == 0) {
diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index a2dac28d30..4fe1bff489 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -310,10 +310,7 @@ Status OlapScanner::_init_return_columns(bool
need_seq_col) {
Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof)
{
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
// 2. Allocate Row's Tuple buf
- Status st = Status::OK();
- uint8_t* tuple_buf =
- batch->tuple_data_pool()->allocate(_batch_size *
_tuple_desc->byte_size(), &st);
- RETURN_NOT_OK_STATUS_WITH_WARN(st, "Allocate mem for row batch failed");
+ uint8_t* tuple_buf = batch->tuple_data_pool()->allocate(_batch_size *
_tuple_desc->byte_size());
if (tuple_buf == nullptr) {
LOG(WARNING) << "Allocate mem for row batch failed.";
return Status::RuntimeError("Allocate mem for row batch failed.");
diff --git a/be/src/exec/partitioned_aggregation_node.cc
b/be/src/exec/partitioned_aggregation_node.cc
index ad5f6788d9..b508bee322 100644
--- a/be/src/exec/partitioned_aggregation_node.cc
+++ b/be/src/exec/partitioned_aggregation_node.cc
@@ -404,14 +404,13 @@ Status PartitionedAggregationNode::CopyStringData(const
SlotDescriptor& slot_des
Tuple* tuple = batch_iter.get()->get_tuple(0);
StringValue* sv =
reinterpret_cast<StringValue*>(tuple->get_slot(slot_desc.tuple_offset()));
if (sv == nullptr || sv->len == 0) continue;
- Status rst;
- char* new_ptr = reinterpret_cast<char*>(pool->try_allocate(sv->len,
&rst));
+ char* new_ptr = reinterpret_cast<char*>(pool->try_allocate(sv->len));
if (UNLIKELY(new_ptr == nullptr)) {
string details = Substitute(
"Cannot perform aggregation at node with id $0."
" Failed to allocate $1 output bytes.",
_id, sv->len);
- RETURN_LIMIT_EXCEEDED(state_, details, sv->len, rst);
+ RETURN_LIMIT_EXCEEDED(state_, details, sv->len);
}
memcpy(new_ptr, sv->ptr, sv->len);
sv->ptr = new_ptr;
@@ -921,8 +920,7 @@ Tuple*
PartitionedAggregationNode::ConstructIntermediateTuple(
const int fixed_size = intermediate_tuple_desc_->byte_size();
const int varlen_size = GroupingExprsVarlenSize();
const int tuple_data_size = fixed_size + varlen_size;
- Status rst;
- uint8_t* tuple_data = pool->try_allocate(tuple_data_size, &rst);
+ uint8_t* tuple_data = pool->try_allocate(tuple_data_size);
if (UNLIKELY(tuple_data == nullptr)) {
stringstream str;
str << "Memory exceed limit. Cannot perform aggregation at node with
id $0. Failed "
@@ -937,7 +935,7 @@ Tuple*
PartitionedAggregationNode::ConstructIntermediateTuple(
string details = Substitute(str.str(), _id, tuple_data_size);
*status = thread_context()
->_thread_mem_tracker_mgr->limiter_mem_tracker()
- ->mem_limit_exceeded(state_, details,
tuple_data_size, rst);
+ ->mem_limit_exceeded(state_, details,
tuple_data_size);
return nullptr;
}
memset(tuple_data, 0, fixed_size);
diff --git a/be/src/exec/partitioned_hash_table.cc
b/be/src/exec/partitioned_hash_table.cc
index cbcc85070c..b78622e137 100644
--- a/be/src/exec/partitioned_hash_table.cc
+++ b/be/src/exec/partitioned_hash_table.cc
@@ -307,13 +307,12 @@ Status
PartitionedHashTableCtx::ExprValuesCache::Init(RuntimeState* state,
MAX_EXPR_VALUES_ARRAY_SIZE /
expr_values_bytes_per_row_));
int mem_usage = MemUsage(capacity_, expr_values_bytes_per_row_,
num_exprs_);
- Status st =
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->check_limit(
- mem_usage);
- if (UNLIKELY(!st)) {
+ if
(UNLIKELY(!thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->check_limit(
+ mem_usage))) {
capacity_ = 0;
string details = Substitute(
"PartitionedHashTableCtx::ExprValuesCache failed to allocate
$0 bytes", mem_usage);
- RETURN_LIMIT_EXCEEDED(state, details, mem_usage, st);
+ RETURN_LIMIT_EXCEEDED(state, details, mem_usage);
}
int expr_values_size = expr_values_bytes_per_row_ * capacity_;
diff --git a/be/src/exec/set_operation_node.cpp
b/be/src/exec/set_operation_node.cpp
index 32794051ef..12ba3d373f 100644
--- a/be/src/exec/set_operation_node.cpp
+++ b/be/src/exec/set_operation_node.cpp
@@ -136,7 +136,6 @@ Status SetOperationNode::open(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::open(state));
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
- SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("SetOperation, while constructing the
hash table.");
RETURN_IF_CANCELLED(state);
// open result expr lists.
for (const std::vector<ExprContext*>& exprs : _child_expr_lists) {
diff --git a/be/src/exprs/anyval_util.cpp b/be/src/exprs/anyval_util.cpp
index 9d923bb4a7..1c3765578a 100644
--- a/be/src/exprs/anyval_util.cpp
+++ b/be/src/exprs/anyval_util.cpp
@@ -44,11 +44,9 @@ Status allocate_any_val(RuntimeState* state, MemPool* pool,
const TypeDescriptor
const std::string& mem_limit_exceeded_msg, AnyVal**
result) {
const int anyval_size = AnyValUtil::any_val_size(type);
const int anyval_alignment = AnyValUtil::any_val_alignment(type);
- Status rst;
- *result = reinterpret_cast<AnyVal*>(
- pool->try_allocate_aligned(anyval_size, anyval_alignment, &rst));
+ *result =
reinterpret_cast<AnyVal*>(pool->try_allocate_aligned(anyval_size,
anyval_alignment));
if (*result == nullptr) {
- RETURN_LIMIT_EXCEEDED(state, mem_limit_exceeded_msg, anyval_size, rst);
+ RETURN_LIMIT_EXCEEDED(state, mem_limit_exceeded_msg, anyval_size);
}
memset(static_cast<void*>(*result), 0, anyval_size);
return Status::OK();
diff --git a/be/src/exprs/expr_context.cpp b/be/src/exprs/expr_context.cpp
index 3f63c93d2c..c37195a6bb 100644
--- a/be/src/exprs/expr_context.cpp
+++ b/be/src/exprs/expr_context.cpp
@@ -413,11 +413,9 @@ Status ExprContext::get_const_value(RuntimeState* state,
Expr& expr, AnyVal** co
StringVal* sv = reinterpret_cast<StringVal*>(*const_val);
if (!sv->is_null && sv->len > 0) {
// Make sure the memory is owned by this evaluator.
- Status rst;
- char* ptr_copy =
reinterpret_cast<char*>(_pool->try_allocate(sv->len, &rst));
+ char* ptr_copy =
reinterpret_cast<char*>(_pool->try_allocate(sv->len));
if (ptr_copy == nullptr) {
- RETURN_LIMIT_EXCEEDED(state, "Could not allocate constant
string value", sv->len,
- rst);
+ RETURN_LIMIT_EXCEEDED(state, "Could not allocate constant
string value", sv->len);
}
memcpy(ptr_copy, sv->ptr, sv->len);
sv->ptr = reinterpret_cast<uint8_t*>(ptr_copy);
diff --git a/be/src/runtime/mem_pool.h b/be/src/runtime/mem_pool.h
index 8fccacc987..84a1b79a30 100644
--- a/be/src/runtime/mem_pool.h
+++ b/be/src/runtime/mem_pool.h
@@ -102,44 +102,40 @@ public:
/// Allocates a section of memory of 'size' bytes with DEFAULT_ALIGNMENT
at the end
/// of the current chunk. Creates a new chunk if there aren't any chunks
/// with enough capacity.
- uint8_t* allocate(int64_t size, Status* rst = nullptr) {
- return allocate<false>(size, DEFAULT_ALIGNMENT, rst);
- }
+ uint8_t* allocate(int64_t size) { return allocate<false>(size,
DEFAULT_ALIGNMENT); }
- uint8_t* allocate_aligned(int64_t size, int alignment, Status* rst =
nullptr) {
+ uint8_t* allocate_aligned(int64_t size, int alignment) {
DCHECK_GE(alignment, 1);
DCHECK_LE(alignment, config::memory_max_alignment);
DCHECK_EQ(BitUtil::RoundUpToPowerOfTwo(alignment), alignment);
- return allocate<false>(size, alignment, rst);
+ return allocate<false>(size, alignment);
}
/// Same as Allocate() expect add a check when return a nullptr
- Status allocate_safely(int64_t size, uint8_t*& ret, Status* rst = nullptr)
{
- return allocate_safely<false>(size, DEFAULT_ALIGNMENT, ret, rst);
+ Status allocate_safely(int64_t size, uint8_t*& ret) {
+ return allocate_safely<false>(size, DEFAULT_ALIGNMENT, ret);
}
/// Same as Allocate() except the mem limit is checked before the
allocation and
/// this call will fail (returns nullptr) if it does.
/// The caller must handle the nullptr case. This should be used for
allocations
/// where the size can be very big to bound the amount by which we exceed
mem limits.
- uint8_t* try_allocate(int64_t size, Status* rst = nullptr) {
- return allocate<true>(size, DEFAULT_ALIGNMENT, rst);
- }
+ uint8_t* try_allocate(int64_t size) { return allocate<true>(size,
DEFAULT_ALIGNMENT); }
/// Same as TryAllocate() except a non-default alignment can be specified.
It
/// should be a power-of-two in [1, alignof(std::max_align_t)].
- uint8_t* try_allocate_aligned(int64_t size, int alignment, Status* rst =
nullptr) {
+ uint8_t* try_allocate_aligned(int64_t size, int alignment) {
DCHECK_GE(alignment, 1);
DCHECK_LE(alignment, config::memory_max_alignment);
DCHECK_EQ(BitUtil::RoundUpToPowerOfTwo(alignment), alignment);
- return allocate<true>(size, alignment, rst);
+ return allocate<true>(size, alignment);
}
/// Same as TryAllocate() except returned memory is not aligned at all.
- uint8_t* try_allocate_unaligned(int64_t size, Status* rst = nullptr) {
+ uint8_t* try_allocate_unaligned(int64_t size) {
// Call templated implementation directly so that it is inlined here
and the
// alignment logic can be optimised out.
- return allocate<true>(size, 1, rst);
+ return allocate<true>(size, 1);
}
/// Makes all allocated chunks available for re-use, but doesn't delete
any chunks.
@@ -252,7 +248,7 @@ private:
}
template <bool CHECK_LIMIT_FIRST>
- uint8_t* ALWAYS_INLINE allocate(int64_t size, int alignment, Status* rst) {
+ uint8_t* ALWAYS_INLINE allocate(int64_t size, int alignment) {
DCHECK_GE(size, 0);
if (UNLIKELY(size == 0)) return
reinterpret_cast<uint8_t*>(&k_zero_length_region_);
@@ -268,22 +264,15 @@ private:
// guarantee alignment.
//static_assert(
//INITIAL_CHUNK_SIZE >= config::FLAGS_MEMORY_MAX_ALIGNMENT, "Min chunk
size too low");
- if (rst == nullptr) {
- if (UNLIKELY(!find_chunk(size + DEFAULT_PADDING_SIZE,
CHECK_LIMIT_FIRST)))
- return nullptr;
- } else {
- *rst = find_chunk(size + DEFAULT_PADDING_SIZE, CHECK_LIMIT_FIRST);
- if (UNLIKELY(!*rst)) return nullptr;
- }
+ if (UNLIKELY(!find_chunk(size + DEFAULT_PADDING_SIZE,
CHECK_LIMIT_FIRST))) return nullptr;
uint8_t* result = allocate_from_current_chunk(size, alignment);
return result;
}
template <bool CHECK_LIMIT_FIRST>
- Status ALWAYS_INLINE allocate_safely(int64_t size, int alignment,
uint8_t*& ret,
- Status* rst = nullptr) {
- uint8_t* result = allocate<CHECK_LIMIT_FIRST>(size, alignment, rst);
+ Status ALWAYS_INLINE allocate_safely(int64_t size, int alignment,
uint8_t*& ret) {
+ uint8_t* result = allocate<CHECK_LIMIT_FIRST>(size, alignment);
if (result == nullptr) {
return Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR);
}
@@ -320,6 +309,6 @@ private:
};
// Stamp out templated implementations here so they're included in IR module
-template uint8_t* MemPool::allocate<false>(int64_t size, int alignment,
Status* rst);
-template uint8_t* MemPool::allocate<true>(int64_t size, int alignment, Status*
rst);
+template uint8_t* MemPool::allocate<false>(int64_t size, int alignment);
+template uint8_t* MemPool::allocate<true>(int64_t size, int alignment);
} // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 6a37b1df86..4976c6ae05 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -143,8 +143,8 @@ bool MemTrackerLimiter::gc_memory(int64_t max_consumption) {
Status MemTrackerLimiter::try_gc_memory(int64_t bytes) {
if (UNLIKELY(gc_memory(_limit - bytes))) {
return Status::MemoryLimitExceeded(
- fmt::format("label={} TryConsume failed size={}, used={},
limit={}", label(), bytes,
- _consumption->current_value(), _limit));
+ fmt::format("label={}, limit={}, used={}, failed consume
size={}", label(), _limit,
+ _consumption->current_value(), bytes));
}
VLOG_NOTICE << "GC succeeded, TryConsume bytes=" << bytes
<< " consumption=" << _consumption->current_value() << "
limit=" << _limit;
@@ -197,9 +197,9 @@ std::string MemTrackerLimiter::log_usage(int
max_recursive_depth, int64_t* logge
std::vector<MemTracker::Snapshot> snapshots;
MemTracker::make_group_snapshot(&snapshots, 0, _group_num, _label);
for (const auto& snapshot : snapshots) {
- child_trackers_usage += MemTracker::log_usage(snapshot);
+ child_trackers_usage += "\n " + MemTracker::log_usage(snapshot);
}
- if (!child_trackers_usage.empty()) detail += "\n" + child_trackers_usage;
+ if (!child_trackers_usage.empty()) detail += child_trackers_usage;
return detail;
}
@@ -217,41 +217,38 @@ std::string MemTrackerLimiter::log_usage(int
max_recursive_depth,
return join(usage_strings, "\n");
}
-Status MemTrackerLimiter::mem_limit_exceeded(RuntimeState* state, const
std::string& details,
- int64_t failed_allocation_size,
Status failed_alloc) {
+Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, int64_t
failed_consume_size) {
STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
- std::string detail =
- "Memory exceed limit. fragment={}, details={}, on backend={}.
Memory left in process "
- "limit={}.";
- detail = fmt::format(
- detail, state != nullptr ? print_id(state->fragment_instance_id())
: "", details,
- BackendOptions::get_localhost(),
+ std::string detail = fmt::format(
+ "{}, failed mem consume:<consume_size={}, mem_limit={},
mem_used={}, tracker_label={}, "
+ "in backend={} free memory left={}. details mem usage see
be.INFO.",
+ msg, PrettyPrinter::print(failed_consume_size, TUnit::BYTES),
_limit,
+ _consumption->current_value(), _label,
BackendOptions::get_localhost(),
PrettyPrinter::print(ExecEnv::GetInstance()->process_mem_tracker()->spare_capacity(),
TUnit::BYTES));
- if (!failed_alloc) {
- detail += " failed alloc=<{}>. current tracker={}.";
- detail = fmt::format(detail, failed_alloc.to_string(), _label);
- } else {
- detail += " current tracker <label={}, used={}, limit={}, failed alloc
size={}>.";
- detail = fmt::format(detail, _label, _consumption->current_value(),
_limit,
- PrettyPrinter::print(failed_allocation_size,
TUnit::BYTES));
- }
- detail += " If this is a query, can change the limit by session variable
exec_mem_limit.";
Status status = Status::MemoryLimitExceeded(detail);
- if (state != nullptr) state->log_error(detail);
- detail += "\n" +
boost::stacktrace::to_string(boost::stacktrace::stacktrace());
// only print the tracker log_usage in be log.
- if (ExecEnv::GetInstance()->process_mem_tracker()->spare_capacity() <
failed_allocation_size) {
- // Dumping the process MemTracker is expensive. Limiting the recursive
depth to two
- // levels limits the level of detail to a one-line summary for each
query MemTracker.
- detail += "\n" +
ExecEnv::GetInstance()->process_mem_tracker()->log_usage(2);
- } else {
- detail += "\n" + log_usage();
+ if (_print_log_usage) {
+ if (ExecEnv::GetInstance()->process_mem_tracker()->spare_capacity() <
failed_consume_size) {
+ // Dumping the process MemTracker is expensive. Limiting the
recursive depth to two
+ // levels limits the level of detail to a one-line summary for
each query MemTracker.
+ detail += "\n" +
ExecEnv::GetInstance()->process_mem_tracker()->log_usage(2);
+ } else {
+ detail += "\n" + log_usage();
+ }
+ detail += "\n" +
boost::stacktrace::to_string(boost::stacktrace::stacktrace());
+ LOG(WARNING) << detail;
+ _print_log_usage = false;
}
-
- LOG(WARNING) << detail;
return status;
}
+Status MemTrackerLimiter::mem_limit_exceeded(RuntimeState* state, const
std::string& msg,
+ int64_t failed_alloc_size) {
+ Status rt = mem_limit_exceeded(msg, failed_alloc_size);
+ state->log_error(rt.to_string());
+ return rt;
+}
+
} // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h
b/be/src/runtime/memory/mem_tracker_limiter.h
index c85205b2c0..1a8ba5a149 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -139,12 +139,13 @@ public:
std::string log_usage(int max_recursive_depth = INT_MAX, int64_t*
logged_consumption = nullptr);
// Log the memory usage when memory limit is exceeded and return a status
object with
- // details of the allocation which caused the limit to be exceeded.
+ // msg of the allocation which caused the limit to be exceeded.
// If 'failed_allocation_size' is greater than zero, logs the allocation
size. If
// 'failed_allocation_size' is zero, nothing about the allocation size is
logged.
// If 'state' is non-nullptr, logs the error to 'state'.
- Status mem_limit_exceeded(RuntimeState* state, const std::string& details
= std::string(),
- int64_t failed_allocation = -1, Status
failed_alloc = Status::OK());
+ Status mem_limit_exceeded(const std::string& msg, int64_t
failed_consume_size);
+ Status mem_limit_exceeded(RuntimeState* state, const std::string& msg =
std::string(),
+ int64_t failed_consume_size = -1);
std::string debug_string() {
std::stringstream msg;
@@ -204,6 +205,8 @@ private:
// The number of child trackers that have been added.
std::atomic_size_t _had_child_count = 0;
+ bool _print_log_usage = true;
+
// Lock to protect gc_memory(). This prevents many GCs from occurring at
once.
std::mutex _gc_lock;
// Functions to call after the limit is reached to free memory.
@@ -280,11 +283,4 @@ inline Status MemTrackerLimiter::check_limit(int64_t
bytes) {
return Status::OK();
}
-#define RETURN_LIMIT_EXCEEDED(state, msg, ...)
\
- return
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->mem_limit_exceeded(
\
- state, msg, ##__VA_ARGS__);
-#define RETURN_IF_LIMIT_EXCEEDED(state, msg)
\
- if
(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->any_limit_exceeded())
\
- RETURN_LIMIT_EXCEEDED(state, msg);
-
} // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp
b/be/src/runtime/memory/mem_tracker_task_pool.cpp
index 3c775db5ec..b63b25df17 100644
--- a/be/src/runtime/memory/mem_tracker_task_pool.cpp
+++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp
@@ -31,15 +31,13 @@ std::shared_ptr<MemTrackerLimiter>
MemTrackerTaskPool::register_task_mem_tracker
// Combine new tracker and emplace into one operation to avoid the use of
locks
// Name for task MemTrackers. '$0' is replaced with the task id.
std::shared_ptr<MemTrackerLimiter> tracker;
- bool new_emplace = _task_mem_trackers.lazy_emplace_l(
+ bool new_emplace = _task_mem_trackers.try_emplace_l(
task_id, [&](const std::shared_ptr<MemTrackerLimiter>& v) {
tracker = v; },
- [&](const auto& ctor) {
- tracker = std::make_shared<MemTrackerLimiter>(mem_limit,
label, parent);
- ctor(task_id, tracker);
- });
+ std::make_shared<MemTrackerLimiter>(mem_limit, label, parent));
if (new_emplace) {
LOG(INFO) << "Register query/load memory tracker, query/load id: " <<
task_id
<< " limit: " << PrettyPrinter::print(mem_limit,
TUnit::BYTES);
+ return get_task_mem_tracker(task_id);
}
return tracker;
}
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
index b6f1ebde33..a3a8dbbc8b 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
@@ -25,14 +25,12 @@
namespace doris {
void ThreadMemTrackerMgr::attach_limiter_tracker(
- const std::string& cancel_msg, const std::string& task_id,
- const TUniqueId& fragment_instance_id,
+ const std::string& task_id, const TUniqueId& fragment_instance_id,
const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
DCHECK(mem_tracker);
flush_untracked_mem<false>();
_task_id = task_id;
_fragment_instance_id = fragment_instance_id;
- _exceed_cb.cancel_msg = cancel_msg;
_limiter_tracker = mem_tracker;
}
@@ -40,7 +38,6 @@ void ThreadMemTrackerMgr::detach_limiter_tracker() {
flush_untracked_mem<false>();
_task_id = "";
_fragment_instance_id = TUniqueId();
- _exceed_cb.cancel_msg = "";
_limiter_tracker = ExecEnv::GetInstance()->process_mem_tracker();
}
@@ -52,20 +49,22 @@ void ThreadMemTrackerMgr::exceeded_cancel_task(const
std::string& cancel_details
}
}
-void ThreadMemTrackerMgr::exceeded(int64_t mem_usage, Status try_consume_st) {
- if (_exceed_cb.cb_func != nullptr) {
- _exceed_cb.cb_func();
+void ThreadMemTrackerMgr::exceeded(int64_t failed_consume_size) {
+ if (_cb_func != nullptr) {
+ _cb_func();
}
- if (is_attach_task()) {
- if (_exceed_cb.cancel_task) {
- auto st = _limiter_tracker->mem_limit_exceeded(
- nullptr,
- fmt::format("Task mem limit exceeded and cancel it,
msg:{}",
- _exceed_cb.cancel_msg),
- mem_usage, try_consume_st);
- exceeded_cancel_task(st.to_string());
- _exceed_cb.cancel_task = false; // Make sure it will only be
canceled once
+ if (is_attach_query()) {
+ std::string cancel_msg;
+ if (!_consumer_tracker_stack.empty()) {
+ cancel_msg = fmt::format(
+ "exec node:<name={}>, can change the limit by `set
exec_mem_limit=xxx`",
+ _consumer_tracker_stack[-1]->label());
+ } else {
+ cancel_msg = "exec node:unknown, can change the limit by `set
exec_mem_limit=xxx`";
}
+ auto st = _limiter_tracker->mem_limit_exceeded(cancel_msg,
failed_consume_size);
+ exceeded_cancel_task(st.to_string());
+ _check_limit = false; // Make sure it will only be canceled once
}
}
} // namespace doris
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index d3940d1e50..1c9cdfc953 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -32,22 +32,6 @@ extern bthread_key_t btls_key;
static const bthread_key_t EMPTY_BTLS_KEY = {0, 0};
using ExceedCallBack = void (*)();
-struct MemExceedCallBackInfo {
- std::string cancel_msg;
- bool cancel_task; // Whether to cancel the task when the current tracker
exceeds the limit.
- ExceedCallBack cb_func;
-
- MemExceedCallBackInfo() { init(); }
-
- MemExceedCallBackInfo(const std::string& cancel_msg, bool cancel_task,
ExceedCallBack cb_func)
- : cancel_msg(cancel_msg), cancel_task(cancel_task),
cb_func(cb_func) {}
-
- void init() {
- cancel_msg = "";
- cancel_task = true;
- cb_func = nullptr;
- }
-};
// TCMalloc new/delete Hook is counted in the memory_tracker of the current
thread.
//
@@ -61,7 +45,6 @@ public:
~ThreadMemTrackerMgr() {
flush_untracked_mem<false>();
- _exceed_cb.init();
DCHECK(_consumer_tracker_stack.empty());
}
@@ -75,8 +58,7 @@ public:
void init();
// After attach, the current thread TCMalloc Hook starts to
consume/release task mem_tracker
- void attach_limiter_tracker(const std::string& cancel_msg, const
std::string& task_id,
- const TUniqueId& fragment_instance_id,
+ void attach_limiter_tracker(const std::string& task_id, const TUniqueId&
fragment_instance_id,
const std::shared_ptr<MemTrackerLimiter>&
mem_tracker);
void detach_limiter_tracker();
@@ -86,16 +68,7 @@ public:
void push_consumer_tracker(MemTracker* mem_tracker);
void pop_consumer_tracker();
- MemExceedCallBackInfo update_exceed_call_back(const std::string&
cancel_msg, bool cancel_task,
- ExceedCallBack cb_func) {
- _temp_exceed_cb = _exceed_cb;
- _exceed_cb.cancel_msg = cancel_msg;
- _exceed_cb.cancel_task = cancel_task;
- _exceed_cb.cb_func = cb_func;
- return _temp_exceed_cb;
- }
-
- void update_exceed_call_back(const MemExceedCallBackInfo& exceed_cb) {
_exceed_cb = exceed_cb; }
+ void set_exceed_call_back(ExceedCallBack cb_func) { _cb_func = cb_func; }
// Note that, If call the memory allocation operation in TCMalloc
new/delete Hook,
// such as calling LOG/iostream/sstream/stringstream/etc. related methods,
@@ -114,7 +87,7 @@ public:
template <bool CheckLimit>
void flush_untracked_mem();
- bool is_attach_task() { return _task_id != ""; }
+ bool is_attach_query() { return _fragment_instance_id != TUniqueId(); }
std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() { return
_limiter_tracker; }
@@ -138,7 +111,7 @@ private:
// If tryConsume fails due to task mem tracker exceeding the limit, the
task must be canceled
void exceeded_cancel_task(const std::string& cancel_details);
- void exceeded(int64_t mem_usage, Status try_consume_st);
+ void exceeded(int64_t failed_consume_size);
private:
// Cache untracked mem, only update to _untracked_mems when switching mem
tracker.
@@ -155,14 +128,12 @@ private:
bool _check_attach = true;
std::string _task_id;
TUniqueId _fragment_instance_id;
- MemExceedCallBackInfo _exceed_cb;
- MemExceedCallBackInfo _temp_exceed_cb;
+ ExceedCallBack _cb_func = nullptr;
};
inline void ThreadMemTrackerMgr::init() {
DCHECK(_consumer_tracker_stack.empty());
_task_id = "";
- _exceed_cb.init();
_limiter_tracker = ExecEnv::GetInstance()->process_mem_tracker();
_check_limit = true;
}
@@ -219,7 +190,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
// The memory has been allocated, so when TryConsume fails, need
to continue to complete
// the consume to ensure the accuracy of the statistics.
_limiter_tracker->consume(_untracked_mem);
- exceeded(_untracked_mem, st);
+ exceeded(_untracked_mem);
}
} else {
_limiter_tracker->consume(_untracked_mem);
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index 5b5aaae861..381649c730 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -251,6 +251,7 @@ Status PlanFragmentExecutor::open() {
if (_cancel_reason == PPlanFragmentCancelReason::CALL_RPC_ERROR) {
status = Status::RuntimeError(_cancel_msg);
} else if (_cancel_reason ==
PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED) {
+ // status = Status::MemoryAllocFailed(_cancel_msg);
status = Status::MemoryLimitExceeded(_cancel_msg);
}
}
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 2d128c13e4..ea7b415622 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -351,7 +351,9 @@ Status RuntimeState::set_mem_limit_exceeded(const
std::string& msg) {
Status RuntimeState::check_query_state(const std::string& msg) {
// TODO: it would be nice if this also checked for cancellation, but doing
so breaks
// cases where we use Status::Cancelled("Cancelled") to indicate that the
limit was reached.
- RETURN_IF_LIMIT_EXCEEDED(this, msg);
+ if
(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->any_limit_exceeded())
{
+ RETURN_LIMIT_EXCEEDED(this, msg);
+ }
return query_status();
}
diff --git a/be/src/runtime/thread_context.cpp
b/be/src/runtime/thread_context.cpp
index 7a6968b30f..cc11c1b29b 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -79,24 +79,6 @@ AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() {
#endif // USE_MEM_TRACKER
}
-UpdateMemExceedCallBack::UpdateMemExceedCallBack(const std::string&
cancel_msg, bool cancel_task,
- ExceedCallBack cb_func) {
-#ifdef USE_MEM_TRACKER
- DCHECK(cancel_msg != std::string());
- _old_cb =
thread_context()->_thread_mem_tracker_mgr->update_exceed_call_back(
- cancel_msg, cancel_task, cb_func);
-#endif
-}
-
-UpdateMemExceedCallBack::~UpdateMemExceedCallBack() {
-#ifdef USE_MEM_TRACKER
-
thread_context()->_thread_mem_tracker_mgr->update_exceed_call_back(_old_cb);
-#ifndef NDEBUG
-
DorisMetrics::instance()->thread_mem_tracker_exceed_call_back_count->increment(1);
-#endif
-#endif // USE_MEM_TRACKER
-}
-
SwitchBthread::SwitchBthread() {
#ifdef USE_MEM_TRACKER
_bthread_context =
static_cast<ThreadContext*>(bthread_getspecific(btls_key));
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 0acc7c6556..9379f12a9d 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -33,9 +33,11 @@
#define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) \
auto VARNAME_LINENUM(add_mem_consumer) =
doris::AddThreadMemTrackerConsumer(mem_tracker)
-#define SCOPED_UPDATE_MEM_EXCEED_CALL_BACK(cancel_msg, ...) \
- auto VARNAME_LINENUM(update_exceed_cb) = \
- doris::UpdateMemExceedCallBack(cancel_msg, ##__VA_ARGS__)
+// Attach to task when thread starts
+#define SCOPED_ATTACH_TASK(arg1, ...) \
+ auto VARNAME_LINENUM(attach_task) = AttachTask(arg1, ##__VA_ARGS__)
+
+#define SCOPED_SWITCH_BTHREAD_TLS() auto VARNAME_LINENUM(switch_bthread) =
SwitchBthread()
namespace doris {
@@ -104,8 +106,8 @@ public:
BRPC = 5
// to be added ...
};
- inline static const std::string TaskTypeStr[] = {"UNKNOWN", "QUERY",
"LOAD", "COMPACTION",
- "STORAGE"};
+ inline static const std::string TaskTypeStr[] = {"UNKNOWN", "QUERY",
"LOAD",
+ "COMPACTION", "STORAGE",
"BRPC"};
public:
ThreadContext() {
@@ -139,8 +141,7 @@ public:
_type = type;
_task_id = task_id;
_fragment_instance_id = fragment_instance_id;
- _thread_mem_tracker_mgr->attach_limiter_tracker(TaskTypeStr[_type],
task_id,
- fragment_instance_id,
mem_tracker);
+ _thread_mem_tracker_mgr->attach_limiter_tracker(task_id,
fragment_instance_id, mem_tracker);
}
void detach_task() {
@@ -225,19 +226,6 @@ public:
~AddThreadMemTrackerConsumer();
};
-class UpdateMemExceedCallBack {
-public:
- explicit UpdateMemExceedCallBack(const std::string& cancel_msg, bool
cancel_task = true,
- ExceedCallBack cb_func = nullptr);
-
- ~UpdateMemExceedCallBack();
-
-private:
-#ifdef USE_MEM_TRACKER
- MemExceedCallBackInfo _old_cb;
-#endif
-};
-
class SwitchBthread {
public:
explicit SwitchBthread();
@@ -261,15 +249,8 @@ public:
}
};
-#define SCOPED_SWITCH_BTHREAD_TLS() auto VARNAME_LINENUM(switch_bthread) =
SwitchBthread()
-
-// Attach to task when thread starts
-#define SCOPED_ATTACH_TASK(arg1, ...) \
- auto VARNAME_LINENUM(attach_task) = AttachTask(arg1, ##__VA_ARGS__)
-
#define STOP_CHECK_THREAD_MEM_TRACKER_LIMIT() \
auto VARNAME_LINENUM(stop_check_limit) = StopCheckThreadMemTrackerLimit()
-
#define CONSUME_THREAD_MEM_TRACKER(size) \
doris::thread_context()->_thread_mem_tracker_mgr->consume(size)
#define RELEASE_THREAD_MEM_TRACKER(size) \
@@ -278,5 +259,8 @@ public:
doris::thread_context()->_thread_mem_tracker_mgr->transfer_to(size,
tracker)
#define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \
doris::thread_context()->_thread_mem_tracker_mgr->transfer_from(size,
tracker)
-
+#define RETURN_LIMIT_EXCEEDED(state, msg, ...) \
+ return doris::thread_context() \
+ ->_thread_mem_tracker_mgr->limiter_mem_tracker() \
+ ->mem_limit_exceeded(state, msg, ##__VA_ARGS__);
} // namespace doris
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index 6cbdcfa53f..e7ae1c3a91 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -1115,7 +1115,6 @@ void HashJoinNode::_hash_table_build_thread(RuntimeState*
state, std::promise<St
Status HashJoinNode::_hash_table_build(RuntimeState* state) {
RETURN_IF_ERROR(child(1)->open(state));
- SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Hash join, while constructing the hash
table.");
SCOPED_TIMER(_build_timer);
MutableBlock mutable_block(child(1)->row_desc().tuple_descriptors());
diff --git a/be/src/vec/exec/vaggregation_node.cpp
b/be/src/vec/exec/vaggregation_node.cpp
index 1209750203..80817a0095 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -401,7 +401,6 @@ Status AggregationNode::prepare(RuntimeState* state) {
Status AggregationNode::open(RuntimeState* state) {
START_AND_SCOPE_SPAN(state->get_tracer(), span, "AggregationNode::open");
SCOPED_TIMER(_runtime_profile->total_time_counter());
- SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("aggregator, while execute open.");
RETURN_IF_ERROR(ExecNode::open(state));
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
@@ -446,7 +445,6 @@ Status AggregationNode::get_next(RuntimeState* state,
Block* block, bool* eos) {
INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span,
"AggregationNode::get_next");
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
- SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("aggregator, while execute get_next.");
if (_is_streaming_preagg) {
bool child_eos = false;
diff --git a/be/src/vec/exec/vcross_join_node.cpp
b/be/src/vec/exec/vcross_join_node.cpp
index d3c9de5843..4a8c2f3b17 100644
--- a/be/src/vec/exec/vcross_join_node.cpp
+++ b/be/src/vec/exec/vcross_join_node.cpp
@@ -53,7 +53,6 @@ Status VCrossJoinNode::close(RuntimeState* state) {
Status VCrossJoinNode::construct_build_side(RuntimeState* state) {
// Do a full scan of child(1) and store all build row batches.
RETURN_IF_ERROR(child(1)->open(state));
- SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Vec Cross join, while getting next
from the child 1");
bool eos = false;
while (true) {
diff --git a/be/src/vec/exec/vset_operation_node.cpp
b/be/src/vec/exec/vset_operation_node.cpp
index 11a85ebd47..88747512e8 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -233,7 +233,6 @@ void VSetOperationNode::hash_table_init() {
//build a hash table from child(0)
Status VSetOperationNode::hash_table_build(RuntimeState* state) {
RETURN_IF_ERROR(child(0)->open(state));
- SCOPED_UPDATE_MEM_EXCEED_CALL_BACK("Vec Set Operation Node, while
constructing the hash table");
Block block;
MutableBlock mutable_block(child(0)->row_desc().tuple_descriptors());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]