This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7682c08af0 [improvement](load) reduce memory in batch for small load
channels (#14214)
7682c08af0 is described below
commit 7682c08af0b8e8619a099cf8a682238d2cd6568e
Author: zhannngchen <[email protected]>
AuthorDate: Sat Nov 12 22:14:01 2022 +0800
[improvement](load) reduce memory in batch for small load channels (#14214)
---
be/src/runtime/load_channel_mgr.cpp | 6 +++
be/src/runtime/load_channel_mgr.h | 103 ++++++++++++++++++++++++++----------
2 files changed, 80 insertions(+), 29 deletions(-)
diff --git a/be/src/runtime/load_channel_mgr.cpp
b/be/src/runtime/load_channel_mgr.cpp
index 6eef349fe1..c81ace2487 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -70,6 +70,12 @@ LoadChannelMgr::~LoadChannelMgr() {
Status LoadChannelMgr::init(int64_t process_mem_limit) {
_load_hard_mem_limit = calc_process_max_load_memory(process_mem_limit);
_load_soft_mem_limit = _load_hard_mem_limit *
config::load_process_soft_mem_limit_percent / 100;
+ // If a load channel's memory consumption is no more than 10% of the hard
limit, it's not
+ // worth to reduce memory on it. Since we only reduce 1/3 memory for one
load channel,
+ // for a channel consume 10% of hard limit, we can only release about 3%
memory each time,
+ // it's not quite helpfull to reduce memory pressure.
+ // In this case we need to pick multiple load channels to reduce memory
more effectively.
+ _load_channel_min_mem_to_reduce = _load_hard_mem_limit * 0.1;
_mem_tracker = std::make_unique<MemTracker>("LoadChannelMgr");
_mem_tracker_set =
std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::LOAD,
"LoadChannelMgrTrackerSet");
diff --git a/be/src/runtime/load_channel_mgr.h
b/be/src/runtime/load_channel_mgr.h
index 3f27eafd0e..c1c86a7890 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -87,7 +87,6 @@ protected:
std::mutex _lock;
// load id -> load channel
std::unordered_map<UniqueId, std::shared_ptr<LoadChannel>> _load_channels;
- std::shared_ptr<LoadChannel> _reduce_memory_channel = nullptr;
Cache* _last_success_channel = nullptr;
// check the total load channel mem consumption of this Backend
@@ -96,6 +95,14 @@ protected:
std::unique_ptr<MemTrackerLimiter> _mem_tracker_set;
int64_t _load_hard_mem_limit = -1;
int64_t _load_soft_mem_limit = -1;
+ // By default, we try to reduce memory on the load channel with largest
mem consumption,
+ // but if there are lots of small load channel, even the largest one
consumes very
+ // small memory, in this case we need to pick multiple load channels to
reduce memory
+ // more effectively.
+ // `_load_channel_min_mem_to_reduce` is used to determine whether the
largest load channel's
+ // memory consumption is big enough.
+ int64_t _load_channel_min_mem_to_reduce = -1;
+ bool _soft_reduce_mem_in_progress = false;
// If hard limit reached, one thread will trigger load channel flush,
// other threads should wait on the condition variable.
@@ -171,10 +178,9 @@ Status
LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response)
MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) {
return Status::OK();
}
- // Pick load channel to reduce memory.
- std::shared_ptr<LoadChannel> channel;
// Indicate whether current thread is reducing mem on hard limit.
bool reducing_mem_on_hard_limit = false;
+ std::vector<std::shared_ptr<LoadChannel>> channels_to_reduce_mem;
{
std::unique_lock<std::mutex> l(_lock);
while (_should_wait_flush) {
@@ -182,41 +188,74 @@ Status
LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response)
<< ", waiting for flush";
_wait_flush_cond.wait(l);
}
+ bool hard_limit_reached = _mem_tracker->consumption() >=
_load_hard_mem_limit ||
+ MemInfo::proc_mem_no_allocator_cache() >=
process_mem_limit;
// Some other thread is flushing data, and not reached hard limit now,
// we don't need to handle mem limit in current thread.
- if (_reduce_memory_channel != nullptr &&
- _mem_tracker->consumption() < _load_hard_mem_limit &&
- MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) {
+ if (_soft_reduce_mem_in_progress && !hard_limit_reached) {
return Status::OK();
}
- // We need to pick a LoadChannel to reduce memory usage.
- // If `_reduce_memory_channel` is not null, it means the hard limit is
- // exceed now, we still need to pick a load channel again. Because
- // `_reduce_memory_channel` might not be the largest consumer now.
- int64_t max_consume = 0;
+ // Pick LoadChannels to reduce memory usage, if some other thread is
reducing memory
+ // due to soft limit, and we reached hard limit now, current thread
may pick some
+ // duplicate channels and trigger duplicate reducing memory process.
+ // But the load channel's reduce memory process is thread safe, only 1
thread can
+ // reduce memory at the same time, other threads will wait on a
condition variable,
+ // after the reduce-memory work finished, all threads will return.
+ using ChannelMemPair = std::pair<std::shared_ptr<LoadChannel>,
int64_t>;
+ std::vector<ChannelMemPair> candidate_channels;
+ int64_t total_consume = 0;
for (auto& kv : _load_channels) {
if (kv.second->is_high_priority()) {
// do not select high priority channel to reduce memory
// to avoid blocking them.
continue;
}
- if (kv.second->mem_consumption() > max_consume) {
- max_consume = kv.second->mem_consumption();
- channel = kv.second;
- }
+ int64_t mem = kv.second->mem_consumption();
+ // save the mem consumption, since the calculation might be
expensive.
+ candidate_channels.push_back(std::make_pair(kv.second, mem));
+ total_consume += mem;
}
- if (max_consume == 0) {
+
+ if (candidate_channels.empty()) {
// should not happen, add log to observe
- LOG(WARNING) << "failed to find suitable load channel when total
load mem limit exceed";
+ LOG(WARNING) << "All load channels are high priority, failed to
find suitable"
+ << "channels to reduce memory when total load mem
limit exceed";
return Status::OK();
}
- DCHECK(channel.get() != nullptr);
- _reduce_memory_channel = channel;
+
+ // sort all load channels, try to find the largest one.
+ std::sort(candidate_channels.begin(), candidate_channels.end(),
+ [](const ChannelMemPair& lhs, const ChannelMemPair& rhs) {
+ return lhs.second > rhs.second;
+ });
+
+ int64_t mem_consumption_in_picked_channel = 0;
+ auto largest_channel = *candidate_channels.begin();
+ // If some load-channel is big enough, we can reduce it only, try our
best to avoid
+ // reducing small load channels.
+ if (_load_channel_min_mem_to_reduce > 0 &&
+ largest_channel.second > _load_channel_min_mem_to_reduce) {
+ // Pick 1 load channel to reduce memory.
+ channels_to_reduce_mem.push_back(largest_channel.first);
+ mem_consumption_in_picked_channel = largest_channel.second;
+ } else {
+ // Pick multiple channels to reduce memory.
+ int64_t mem_to_flushed = total_consume / 3;
+ for (auto ch : candidate_channels) {
+ channels_to_reduce_mem.push_back(ch.first);
+ mem_consumption_in_picked_channel += ch.second;
+ if (mem_consumption_in_picked_channel >= mem_to_flushed) {
+ break;
+ }
+ }
+ }
std::ostringstream oss;
if (MemInfo::proc_mem_no_allocator_cache() < process_mem_limit) {
- oss << "reducing memory of " << *channel << " because total load
mem consumption "
+ oss << "reducing memory of " << channels_to_reduce_mem.size()
+ << " load channels (total mem consumption: " <<
mem_consumption_in_picked_channel
+ << " bytes), because total load mem consumption "
<< PrettyPrinter::print(_mem_tracker->consumption(),
TUnit::BYTES)
<< " has exceeded";
if (_mem_tracker->consumption() > _load_hard_mem_limit) {
@@ -224,24 +263,30 @@ Status
LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response)
reducing_mem_on_hard_limit = true;
oss << " hard limit: " <<
PrettyPrinter::print(_load_hard_mem_limit, TUnit::BYTES);
} else {
+ _soft_reduce_mem_in_progress = true;
oss << " soft limit: " <<
PrettyPrinter::print(_load_soft_mem_limit, TUnit::BYTES);
}
} else {
_should_wait_flush = true;
reducing_mem_on_hard_limit = true;
- oss << "reducing memory of " << *channel << " because process
memory used "
- << PerfCounters::get_vm_rss_str() << " has exceeded limit "
+ oss << "reducing memory of " << channels_to_reduce_mem.size()
+ << " load channels (total mem consumption: " <<
mem_consumption_in_picked_channel
+ << " bytes), because " << PerfCounters::get_vm_rss_str() << "
has exceeded limit "
<< PrettyPrinter::print(process_mem_limit, TUnit::BYTES)
<< " , tc/jemalloc allocator cache " <<
MemInfo::allocator_cache_mem_str();
}
LOG(INFO) << oss.str();
}
- // No matter soft limit or hard limit reached, only 1 thread will wait
here,
- // if hard limit reached, other threads will pend at the beginning of this
- // method.
- Status st = channel->handle_mem_exceed_limit(response);
- LOG(INFO) << "reduce memory of " << *channel << " finished";
+ Status st = Status::OK();
+ for (auto ch : channels_to_reduce_mem) {
+ uint64_t begin = GetCurrentTimeMicros();
+ int64_t mem_usage = ch->mem_consumption();
+ st = ch->handle_mem_exceed_limit(response);
+ LOG(INFO) << "reduced memory of " << *ch << ", cost "
+ << (GetCurrentTimeMicros() - begin) / 1000
+ << " ms, released memory: " << mem_usage -
ch->mem_consumption() << " bytes";
+ }
{
std::lock_guard<std::mutex> l(_lock);
@@ -251,8 +296,8 @@ Status
LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult* response)
_should_wait_flush = false;
_wait_flush_cond.notify_all();
}
- if (_reduce_memory_channel == channel) {
- _reduce_memory_channel = nullptr;
+ if (_soft_reduce_mem_in_progress) {
+ _soft_reduce_mem_in_progress = false;
}
}
return st;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]