This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new b4e136bfccd [performance](move-memtable) async close tablet streams
(#41156 & #43813) (#44128)
b4e136bfccd is described below
commit b4e136bfccdc9636a8df9307d05631deb8e008ac
Author: Kaijie Chen <[email protected]>
AuthorDate: Tue Nov 19 14:14:53 2024 +0800
[performance](move-memtable) async close tablet streams (#41156 & #43813)
(#44128)
backport #41156 and #43813
---
be/src/runtime/load_stream.cpp | 133 +++++++++++++++++-----------------
be/src/runtime/load_stream.h | 5 +-
be/src/runtime/load_stream_writer.cpp | 12 ++-
be/src/runtime/load_stream_writer.h | 9 +++
4 files changed, 91 insertions(+), 68 deletions(-)
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index a2d9fd2f611..79427703873 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -62,7 +62,6 @@ TabletStream::TabletStream(PUniqueId load_id, int64_t id,
int64_t txn_id,
_txn_id(txn_id),
_load_stream_mgr(load_stream_mgr) {
load_stream_mgr->create_tokens(_flush_tokens);
- _status = Status::OK();
_profile = profile->create_child(fmt::format("TabletStream {}", id), true,
true);
_append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
_add_segment_timer = ADD_TIMER(_profile, "AddSegmentTime");
@@ -71,7 +70,7 @@ TabletStream::TabletStream(PUniqueId load_id, int64_t id,
int64_t txn_id,
inline std::ostream& operator<<(std::ostream& ostr, const TabletStream&
tablet_stream) {
ostr << "load_id=" << tablet_stream._load_id << ", txn_id=" <<
tablet_stream._txn_id
- << ", tablet_id=" << tablet_stream._id << ", status=" <<
tablet_stream._status;
+ << ", tablet_id=" << tablet_stream._id << ", status=" <<
tablet_stream._status.status();
return ostr;
}
@@ -89,19 +88,19 @@ Status
TabletStream::init(std::shared_ptr<OlapTableSchemaParam> schema, int64_t
_load_stream_writer = std::make_shared<LoadStreamWriter>(&req, _profile);
DBUG_EXECUTE_IF("TabletStream.init.uninited_writer", {
- _status = Status::Uninitialized("fault injection");
- return _status;
+ _status.update(Status::Uninitialized("fault injection"));
+ return _status.status();
});
- _status = _load_stream_writer->init();
+ _status.update(_load_stream_writer->init());
if (!_status.ok()) {
LOG(INFO) << "failed to init rowset builder due to " << *this;
}
- return _status;
+ return _status.status();
}
Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf*
data) {
if (!_status.ok()) {
- return _status;
+ return _status.status();
}
// dispatch add_segment request
@@ -150,8 +149,8 @@ Status TabletStream::append_data(const PStreamHeader&
header, butil::IOBuf* data
if (eos && st.ok()) {
st = _load_stream_writer->close_segment(new_segid);
}
- if (!st.ok() && _status.ok()) {
- _status = st;
+ if (!st.ok()) {
+ _status.update(st);
LOG(WARNING) << "write data failed " << st << ", " << *this;
}
};
@@ -167,11 +166,11 @@ Status TabletStream::append_data(const PStreamHeader&
header, butil::IOBuf* data
timer.start();
while (flush_token->num_tasks() >= load_stream_flush_token_max_tasks) {
if (timer.elapsed_time() / 1000 / 1000 >=
load_stream_max_wait_flush_token_time_ms) {
- _status = Status::Error<true>(
- "wait flush token back pressure time is more than "
- "load_stream_max_wait_flush_token_time {}",
- load_stream_max_wait_flush_token_time_ms);
- return _status;
+ _status.update(
+ Status::Error<true>("wait flush token back pressure time
is more than "
+ "load_stream_max_wait_flush_token_time
{}",
+
load_stream_max_wait_flush_token_time_ms));
+ return _status.status();
}
bthread_usleep(2 * 1000); // 2ms
}
@@ -181,14 +180,14 @@ Status TabletStream::append_data(const PStreamHeader&
header, butil::IOBuf* data
g_load_stream_flush_running_threads << 1;
auto st = flush_token->submit_func(flush_func);
if (!st.ok()) {
- _status = st;
+ _status.update(st);
}
- return _status;
+ return _status.status();
}
Status TabletStream::add_segment(const PStreamHeader& header, butil::IOBuf*
data) {
if (!_status.ok()) {
- return _status;
+ return _status.status();
}
SCOPED_TIMER(_add_segment_timer);
@@ -207,17 +206,17 @@ Status TabletStream::add_segment(const PStreamHeader&
header, butil::IOBuf* data
{
std::lock_guard lock_guard(_lock);
if (!_segids_mapping.contains(src_id)) {
- _status = Status::InternalError(
+ _status.update(Status::InternalError(
"add segment failed, no segment written by this src be
yet, src_id={}, "
"segment_id={}",
- src_id, segid);
- return _status;
+ src_id, segid));
+ return _status.status();
}
if (segid >= _segids_mapping[src_id]->size()) {
- _status = Status::InternalError(
+ _status.update(Status::InternalError(
"add segment failed, segment is never written, src_id={},
segment_id={}",
- src_id, segid);
- return _status;
+ src_id, segid));
+ return _status.status();
}
new_segid = _segids_mapping[src_id]->at(segid);
}
@@ -226,76 +225,76 @@ Status TabletStream::add_segment(const PStreamHeader&
header, butil::IOBuf* data
auto add_segment_func = [this, new_segid, stat, flush_schema]() {
signal::set_signal_task_id(_load_id);
auto st = _load_stream_writer->add_segment(new_segid, stat,
flush_schema);
- if (!st.ok() && _status.ok()) {
- _status = st;
+ if (!st.ok()) {
+ _status.update(st);
LOG(INFO) << "add segment failed " << *this;
}
};
auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()];
auto st = flush_token->submit_func(add_segment_func);
if (!st.ok()) {
- _status = st;
+ _status.update(st);
}
- return _status;
+ return _status.status();
}
-Status TabletStream::close() {
- if (!_status.ok()) {
- return _status;
- }
-
- SCOPED_TIMER(_close_wait_timer);
+Status TabletStream::_run_in_heavy_work_pool(std::function<Status()> fn) {
bthread::Mutex mu;
std::unique_lock<bthread::Mutex> lock(mu);
bthread::ConditionVariable cv;
- auto wait_func = [this, &mu, &cv] {
+ auto st = Status::OK();
+ auto func = [this, &mu, &cv, &st, &fn] {
signal::set_signal_task_id(_load_id);
- for (auto& token : _flush_tokens) {
- token->wait();
- }
+ st = fn();
std::lock_guard<bthread::Mutex> lock(mu);
cv.notify_one();
};
- bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(wait_func);
- if (ret) {
- cv.wait(lock);
- } else {
- _status = Status::Error<ErrorCode::INTERNAL_ERROR>(
+ bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(func);
+ if (!ret) {
+ return Status::Error<ErrorCode::INTERNAL_ERROR>(
"there is not enough thread resource for close load");
- return _status;
}
+ cv.wait(lock);
+ return st;
+}
- if (_check_num_segments && (_next_segid.load() != _num_segments)) {
- _status = Status::Corruption(
- "segment num mismatch in tablet {}, expected: {}, actual: {},
load_id: {}", _id,
- _num_segments, _next_segid.load(), print_id(_load_id));
- return _status;
+void TabletStream::pre_close() {
+ if (!_status.ok()) {
+ return;
}
+ SCOPED_TIMER(_close_wait_timer);
+ _status.update(_run_in_heavy_work_pool([this]() {
+ for (auto& token : _flush_tokens) {
+ token->wait();
+ }
+ return Status::OK();
+ }));
// it is necessary to check status after wait_func,
// for create_rowset could fail during add_segment when loading to MOW
table,
// in this case, should skip close to avoid submit_calc_delete_bitmap_task
which could cause coredump.
if (!_status.ok()) {
- return _status;
+ return;
}
- auto close_func = [this, &mu, &cv]() {
- signal::set_signal_task_id(_load_id);
- auto st = _load_stream_writer->close();
- if (!st.ok() && _status.ok()) {
- _status = st;
- }
- std::lock_guard<bthread::Mutex> lock(mu);
- cv.notify_one();
- };
- ret = _load_stream_mgr->heavy_work_pool()->try_offer(close_func);
- if (ret) {
- cv.wait(lock);
- } else {
- _status = Status::Error<ErrorCode::INTERNAL_ERROR>(
- "there is not enough thread resource for close load");
+ if (_check_num_segments && (_next_segid.load() != _num_segments)) {
+ _status.update(Status::Corruption(
+ "segment num mismatch in tablet {}, expected: {}, actual: {},
load_id: {}", _id,
+ _num_segments, _next_segid.load(), print_id(_load_id)));
+ return;
}
- return _status;
+
+ _status.update(_run_in_heavy_work_pool([this]() { return
_load_stream_writer->pre_close(); }));
+}
+
+Status TabletStream::close() {
+ if (!_status.ok()) {
+ return _status.status();
+ }
+
+ SCOPED_TIMER(_close_wait_timer);
+ _status.update(_run_in_heavy_work_pool([this]() { return
_load_stream_writer->close(); }));
+ return _status.status();
}
IndexStream::IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id,
@@ -363,6 +362,10 @@ void IndexStream::close(const std::vector<PTabletID>&
tablets_to_commit,
}
}
+ for (auto& [_, tablet_stream] : _tablet_streams_map) {
+ tablet_stream->pre_close();
+ }
+
for (auto& [_, tablet_stream] : _tablet_streams_map) {
auto st = tablet_stream->close();
if (st.ok()) {
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index 829c28b15e8..462fce5a147 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -54,12 +54,15 @@ public:
Status add_segment(const PStreamHeader& header, butil::IOBuf* data);
void add_num_segments(int64_t num_segments) { _num_segments +=
num_segments; }
void disable_num_segments_check() { _check_num_segments = false; }
+ void pre_close();
Status close();
int64_t id() const { return _id; }
friend std::ostream& operator<<(std::ostream& ostr, const TabletStream&
tablet_stream);
private:
+ Status _run_in_heavy_work_pool(std::function<Status()> fn);
+
int64_t _id;
LoadStreamWriterSharedPtr _load_stream_writer;
std::vector<std::unique_ptr<ThreadPoolToken>> _flush_tokens;
@@ -68,7 +71,7 @@ private:
int64_t _num_segments = 0;
bool _check_num_segments = true;
bthread::Mutex _lock;
- Status _status;
+ AtomicStatus _status;
PUniqueId _load_id;
int64_t _txn_id;
RuntimeProfile* _profile = nullptr;
diff --git a/be/src/runtime/load_stream_writer.cpp
b/be/src/runtime/load_stream_writer.cpp
index b5fd23ba22b..9ed73c50b49 100644
--- a/be/src/runtime/load_stream_writer.cpp
+++ b/be/src/runtime/load_stream_writer.cpp
@@ -194,8 +194,7 @@ Status LoadStreamWriter::add_segment(uint32_t segid, const
SegmentStatistics& st
return _rowset_writer->add_segment(segid, stat, flush_schema);
}
-Status LoadStreamWriter::close() {
- std::lock_guard<std::mutex> l(_lock);
+Status LoadStreamWriter::_pre_close() {
SCOPED_ATTACH_TASK(_query_thread_context);
if (!_is_init) {
// if this delta writer is not initialized, but close() is called.
@@ -222,6 +221,15 @@ Status LoadStreamWriter::close() {
RETURN_IF_ERROR(_rowset_builder->build_rowset());
RETURN_IF_ERROR(_rowset_builder->submit_calc_delete_bitmap_task());
+ _pre_closed = true;
+ return Status::OK();
+}
+
+Status LoadStreamWriter::close() {
+ std::lock_guard<std::mutex> l(_lock);
+ if (!_pre_closed) {
+ RETURN_IF_ERROR(_pre_close());
+ }
RETURN_IF_ERROR(_rowset_builder->wait_calc_delete_bitmap());
RETURN_IF_ERROR(_rowset_builder->commit_txn());
diff --git a/be/src/runtime/load_stream_writer.h
b/be/src/runtime/load_stream_writer.h
index 9e3fce3c7db..bd27093bd2d 100644
--- a/be/src/runtime/load_stream_writer.h
+++ b/be/src/runtime/load_stream_writer.h
@@ -67,12 +67,21 @@ public:
Status add_segment(uint32_t segid, const SegmentStatistics& stat,
TabletSchemaSPtr flush_chema);
+ Status pre_close() {
+ std::lock_guard<std::mutex> l(_lock);
+ return _pre_close();
+ }
+
// wait for all memtables to be flushed.
Status close();
private:
+ // without lock
+ Status _pre_close();
+
bool _is_init = false;
bool _is_canceled = false;
+ bool _pre_closed = false;
WriteRequest _req;
std::unique_ptr<BaseRowsetBuilder> _rowset_builder;
std::shared_ptr<RowsetWriter> _rowset_writer;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]