This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 942b31038f [fix](memory) Fix BE OOM when load -238 fail (#12666)
942b31038f is described below
commit 942b31038ff4569f342d5c140a9ab04b3bd4fca9
Author: Xinyi Zou <[email protected]>
AuthorDate: Sat Sep 17 00:17:53 2022 +0800
[fix](memory) Fix BE OOM when load -238 fail (#12666)
When the flush is triggered when the load channel exceeds the mem limit, if
the flush fails, an error message is returned and the load is terminated.
Usually flush failure is -238 error code. Because the memtable is
frequently flushed after the load channel exceeds the mem limit, the number of
segments exceeds the max value.
---
be/src/runtime/load_channel.cpp | 22 ---------------------
be/src/runtime/load_channel.h | 29 +++++++++++++++++++++++++--
be/src/runtime/load_channel_mgr.cpp | 33 -------------------------------
be/src/runtime/load_channel_mgr.h | 39 +++++++++++++++++++++++++++++++++++--
be/src/runtime/tablets_channel.cpp | 27 +++++++++++++++++++++++--
be/src/runtime/tablets_channel.h | 3 ++-
6 files changed, 91 insertions(+), 62 deletions(-)
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index c2e3d5c644..48d62a4445 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -92,28 +92,6 @@ Status
LoadChannel::_get_tablets_channel(std::shared_ptr<TabletsChannel>& channe
return Status::OK();
}
-void LoadChannel::handle_mem_exceed_limit(bool force) {
- // lock so that only one thread can check mem limit
- std::lock_guard<std::mutex> l(_lock);
- if (!(force || _mem_tracker->limit_exceeded())) {
- return;
- }
-
- if (!force) {
- LOG(INFO) << "reducing memory of " << *this << " because its mem
consumption "
- << _mem_tracker->consumption() << " has exceeded limit " <<
_mem_tracker->limit();
- }
-
- std::shared_ptr<TabletsChannel> channel;
- if (_find_largest_consumption_channel(&channel)) {
- channel->reduce_mem_usage(_mem_tracker->limit());
- } else {
- // should not happen, add log to observe
- LOG(WARNING) << "fail to find suitable tablets-channel when memory
exceed. "
- << "load_id=" << _load_id;
- }
-}
-
// lock should be held when calling this method
bool
LoadChannel::_find_largest_consumption_channel(std::shared_ptr<TabletsChannel>*
channel) {
int64_t max_consume = 0;
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 565cd67ebf..125cd12fe1 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -64,7 +64,8 @@ public:
// If yes, it will pick a tablets channel to try to reduce memory
consumption.
// If force is true, even if this load channel does not exceeds limit, it
will still
// try to reduce memory.
- void handle_mem_exceed_limit(bool force);
+ template <typename TabletWriterAddResult>
+ Status handle_mem_exceed_limit(bool force, TabletWriterAddResult*
response);
int64_t mem_consumption() const { return _mem_tracker->consumption(); }
@@ -141,7 +142,7 @@ Status LoadChannel::add_batch(const TabletWriterAddRequest&
request,
}
// 2. check if mem consumption exceed limit
- handle_mem_exceed_limit(false);
+ RETURN_IF_ERROR(handle_mem_exceed_limit(false, response));
// 3. add batch to tablets channel
if constexpr (std::is_same_v<TabletWriterAddRequest,
PTabletWriterAddBatchRequest>) {
@@ -172,4 +173,28 @@ inline std::ostream& operator<<(std::ostream& os, const
LoadChannel& load_channe
return os;
}
+template <typename TabletWriterAddResult>
+Status LoadChannel::handle_mem_exceed_limit(bool force, TabletWriterAddResult*
response) {
+ // lock so that only one thread can check mem limit
+ std::lock_guard<std::mutex> l(_lock);
+ if (!(force || _mem_tracker->limit_exceeded())) {
+ return Status::OK();
+ }
+
+ if (!force) {
+ LOG(INFO) << "reducing memory of " << *this << " because its mem
consumption "
+ << _mem_tracker->consumption() << " has exceeded limit " <<
_mem_tracker->limit();
+ }
+
+ std::shared_ptr<TabletsChannel> channel;
+ if (_find_largest_consumption_channel(&channel)) {
+ return channel->reduce_mem_usage(_mem_tracker->limit(), response);
+ } else {
+ // should not happen, add log to observe
+ LOG(WARNING) << "fail to find suitable tablets-channel when memory
exceed. "
+ << "load_id=" << _load_id;
+ }
+ return Status::OK();
+}
+
} // namespace doris
diff --git a/be/src/runtime/load_channel_mgr.cpp
b/be/src/runtime/load_channel_mgr.cpp
index c252fe2aff..e45fabd994 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -139,39 +139,6 @@ void LoadChannelMgr::_finish_load_channel(const UniqueId
load_id) {
VLOG_CRITICAL << "removed load channel " << load_id;
}
-void LoadChannelMgr::_handle_mem_exceed_limit() {
- // lock so that only one thread can check mem limit
- std::lock_guard<std::mutex> l(_lock);
- if (!_mem_tracker->limit_exceeded()) {
- return;
- }
-
- int64_t max_consume = 0;
- std::shared_ptr<LoadChannel> channel;
- for (auto& kv : _load_channels) {
- if (kv.second->is_high_priority()) {
- // do not select high priority channel to reduce memory
- // to avoid blocking them.
- continue;
- }
- if (kv.second->mem_consumption() > max_consume) {
- max_consume = kv.second->mem_consumption();
- channel = kv.second;
- }
- }
- if (max_consume == 0) {
- // should not happen, add log to observe
- LOG(WARNING) << "failed to find suitable load channel when total load
mem limit exceed";
- return;
- }
- DCHECK(channel.get() != nullptr);
-
- // force reduce mem limit of the selected channel
- LOG(INFO) << "reducing memory of " << *channel << " because total load mem
consumption "
- << _mem_tracker->consumption() << " has exceeded limit " <<
_mem_tracker->limit();
- channel->handle_mem_exceed_limit(true);
-}
-
Status LoadChannelMgr::cancel(const PTabletWriterCancelRequest& params) {
UniqueId load_id(params.id());
std::shared_ptr<LoadChannel> cancelled_channel;
diff --git a/be/src/runtime/load_channel_mgr.h
b/be/src/runtime/load_channel_mgr.h
index af9d3d6240..ebcba803e5 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -66,7 +66,8 @@ private:
void _finish_load_channel(UniqueId load_id);
// check if the total load mem consumption exceeds limit.
// If yes, it will pick a load channel to try to reduce memory consumption.
- void _handle_mem_exceed_limit();
+ template <typename TabletWriterAddResult>
+ Status _handle_mem_exceed_limit(TabletWriterAddResult* response);
Status _start_bg_worker();
@@ -125,7 +126,7 @@ Status LoadChannelMgr::add_batch(const
TabletWriterAddRequest& request,
// 2. check if mem consumption exceed limit
// If this is a high priority load task, do not handle this.
// because this may block for a while, which may lead to rpc timeout.
- _handle_mem_exceed_limit();
+ RETURN_IF_ERROR(_handle_mem_exceed_limit(response));
}
// 3. add batch to load channel
@@ -140,4 +141,38 @@ Status LoadChannelMgr::add_batch(const
TabletWriterAddRequest& request,
return Status::OK();
}
+template <typename TabletWriterAddResult>
+Status LoadChannelMgr::_handle_mem_exceed_limit(TabletWriterAddResult*
response) {
+ // lock so that only one thread can check mem limit
+ std::lock_guard<std::mutex> l(_lock);
+ if (!_mem_tracker->limit_exceeded()) {
+ return Status::OK();
+ }
+
+ int64_t max_consume = 0;
+ std::shared_ptr<LoadChannel> channel;
+ for (auto& kv : _load_channels) {
+ if (kv.second->is_high_priority()) {
+ // do not select high priority channel to reduce memory
+ // to avoid blocking them.
+ continue;
+ }
+ if (kv.second->mem_consumption() > max_consume) {
+ max_consume = kv.second->mem_consumption();
+ channel = kv.second;
+ }
+ }
+ if (max_consume == 0) {
+ // should not happen, add log to observe
+ LOG(WARNING) << "failed to find suitable load channel when total load
mem limit exceed";
+ return Status::OK();
+ }
+ DCHECK(channel.get() != nullptr);
+
+ // force reduce mem limit of the selected channel
+ LOG(INFO) << "reducing memory of " << *channel << " because total load mem
consumption "
+ << _mem_tracker->consumption() << " has exceeded limit " <<
_mem_tracker->limit();
+ return channel->handle_mem_exceed_limit(true, response);
+}
+
} // namespace doris
diff --git a/be/src/runtime/tablets_channel.cpp
b/be/src/runtime/tablets_channel.cpp
index 0681a350fa..274512a180 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -194,7 +194,8 @@ void TabletsChannel::_close_wait(DeltaWriter* writer,
}
}
-Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) {
+template <typename TabletWriterAddResult>
+Status TabletsChannel::reduce_mem_usage(int64_t mem_limit,
TabletWriterAddResult* response) {
std::lock_guard<std::mutex> l(_lock);
if (_state == kFinished) {
// TabletsChannel is closed without LoadChannel's lock,
@@ -237,11 +238,29 @@ Status TabletsChannel::reduce_mem_usage(int64_t
mem_limit) {
}
}
VLOG_CRITICAL << "flush " << counter << " memtables to reduce memory: " <<
sum;
+ google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors =
+ response->mutable_tablet_errors();
for (int i = 0; i < counter; i++) {
- writers[i]->flush_memtable_and_wait(false);
+ Status st = writers[i]->flush_memtable_and_wait(false);
+ if (!st.ok()) {
+ auto err_msg = strings::Substitute(
+ "tablet writer failed to reduce mem consumption by
flushing memtable, "
+ "tablet_id=$0, txn_id=$1, err=$2, errcode=$3, msg:$4",
+ writers[i]->tablet_id(), _txn_id, st.code(),
st.precise_code(),
+ st.get_error_msg());
+ LOG(WARNING) << err_msg;
+ PTabletError* error = tablet_errors->Add();
+ error->set_tablet_id(writers[i]->tablet_id());
+ error->set_msg(err_msg);
+ _broken_tablets.insert(writers[i]->tablet_id());
+ }
}
for (int i = 0; i < counter; i++) {
+ if (_broken_tablets.find(writers[i]->tablet_id()) !=
_broken_tablets.end()) {
+ // skip broken tablets
+ continue;
+ }
Status st = writers[i]->wait_flush();
if (!st.ok()) {
return Status::InternalError(
@@ -402,4 +421,8 @@ TabletsChannel::add_batch<PTabletWriterAddBatchRequest,
PTabletWriterAddBatchRes
template Status
TabletsChannel::add_batch<PTabletWriterAddBlockRequest,
PTabletWriterAddBlockResult>(
PTabletWriterAddBlockRequest const&, PTabletWriterAddBlockResult*);
+template Status TabletsChannel::reduce_mem_usage<PTabletWriterAddBatchResult>(
+ int64_t, PTabletWriterAddBatchResult*);
+template Status TabletsChannel::reduce_mem_usage<PTabletWriterAddBlockResult>(
+ int64_t, PTabletWriterAddBlockResult*);
} // namespace doris
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index d4d7076473..dc54740b09 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -92,7 +92,8 @@ public:
// eg. flush the largest memtable immediately.
// return Status::OK if mem is reduced.
// no-op when this channel has been closed or cancelled
- Status reduce_mem_usage(int64_t mem_limit);
+ template <typename TabletWriterAddResult>
+ Status reduce_mem_usage(int64_t mem_limit, TabletWriterAddResult*
response);
int64_t mem_consumption() const { return _mem_tracker->consumption(); }
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]