This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3b830f89a7f [improve](move-memtable) avoid using heavy work pool
during append data (#28745)
3b830f89a7f is described below
commit 3b830f89a7fa212d250baba15547f56524f1d75f
Author: Kaijie Chen <[email protected]>
AuthorDate: Fri Dec 22 22:51:30 2023 +0800
[improve](move-memtable) avoid using heavy work pool during append data
(#28745)
---
be/src/runtime/load_stream.cpp | 112 +++++++++++++++++++----------------------
be/src/runtime/load_stream.h | 1 +
2 files changed, 54 insertions(+), 59 deletions(-)
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 0c7991bde97..307cd4ef30b 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -45,7 +45,11 @@ namespace doris {
TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id,
LoadStreamMgr* load_stream_mgr, RuntimeProfile*
profile)
- : _id(id), _next_segid(0), _load_id(load_id), _txn_id(txn_id) {
+ : _id(id),
+ _next_segid(0),
+ _load_id(load_id),
+ _txn_id(txn_id),
+ _load_stream_mgr(load_stream_mgr) {
load_stream_mgr->create_tokens(_flush_tokens);
_failed_st = std::make_shared<Status>();
_profile = profile->create_child(fmt::format("TabletStream {}", id), true,
true);
@@ -125,6 +129,7 @@ Status TabletStream::append_data(const PStreamHeader&
header, butil::IOBuf* data
DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
butil::IOBuf buf = data->movable();
auto flush_func = [this, new_segid, eos, buf, header]() {
+ signal::set_signal_task_id(_load_id);
auto st = _load_stream_writer->append_data(new_segid, header.offset(),
buf);
if (eos && st.ok()) {
st = _load_stream_writer->close_segment(new_segid);
@@ -166,6 +171,7 @@ Status TabletStream::add_segment(const PStreamHeader&
header, butil::IOBuf* data
DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
auto add_segment_func = [this, new_segid, stat, flush_schema]() {
+ signal::set_signal_task_id(_load_id);
auto st = _load_stream_writer->add_segment(new_segid, stat,
flush_schema);
if (!st.ok() && _failed_st->ok()) {
_failed_st = std::make_shared<Status>(st);
@@ -181,13 +187,44 @@ Status TabletStream::add_segment(const PStreamHeader&
header, butil::IOBuf* data
Status TabletStream::close() {
SCOPED_TIMER(_close_wait_timer);
- for (auto& token : _flush_tokens) {
- token->wait();
+ bthread::Mutex mu;
+ std::unique_lock<bthread::Mutex> lock(mu);
+ bthread::ConditionVariable cv;
+ auto wait_func = [this, &mu, &cv] {
+ signal::set_signal_task_id(_load_id);
+ for (auto& token : _flush_tokens) {
+ token->wait();
+ }
+ std::lock_guard<bthread::Mutex> lock(mu);
+ cv.notify_one();
+ };
+ bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(wait_func);
+ if (ret) {
+ cv.wait(lock);
+ } else {
+ return Status::Error<ErrorCode::INTERNAL_ERROR>(
+ "there is not enough thread resource for close load");
}
+
if (!_failed_st->ok()) {
return *_failed_st;
}
- return _load_stream_writer->close();
+
+ Status st = Status::OK();
+ auto close_func = [this, &mu, &cv, &st]() {
+ signal::set_signal_task_id(_load_id);
+ st = _load_stream_writer->close();
+ std::lock_guard<bthread::Mutex> lock(mu);
+ cv.notify_one();
+ };
+ ret = _load_stream_mgr->heavy_work_pool()->try_offer(close_func);
+ if (ret) {
+ cv.wait(lock);
+ } else {
+ return Status::Error<ErrorCode::INTERNAL_ERROR>(
+ "there is not enough thread resource for close load");
+ }
+ return st;
}
IndexStream::IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id,
@@ -244,13 +281,13 @@ Status IndexStream::close(const std::vector<PTabletID>&
tablets_to_commit,
}
}
- for (auto& it : _tablet_streams_map) {
- auto st = it.second->close();
+ for (auto& [_, tablet_stream] : _tablet_streams_map) {
+ auto st = tablet_stream->close();
if (st.ok()) {
- success_tablet_ids->push_back(it.second->id());
+ success_tablet_ids->push_back(tablet_stream->id());
} else {
- LOG(INFO) << "close tablet stream " << *it.second << ", status="
<< st;
- failed_tablet_ids->push_back(it.second->id());
+ LOG(INFO) << "close tablet stream " << *tablet_stream << ",
status=" << st;
+ failed_tablet_ids->push_back(tablet_stream->id());
}
}
return Status::OK();
@@ -308,37 +345,13 @@ Status LoadStream::close(int64_t src_id, const
std::vector<PTabletID>& tablets_t
return Status::OK();
}
- Status st = Status::OK();
- {
- bthread::Mutex mutex;
- std::unique_lock<bthread::Mutex> lock(mutex);
- bthread::ConditionVariable cond;
- bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(
- [this, &success_tablet_ids, &failed_tablet_ids, &mutex, &cond,
&st]() {
- signal::set_signal_task_id(_load_id);
- for (auto& it : _index_streams_map) {
- st = it.second->close(_tablets_to_commit,
success_tablet_ids,
- failed_tablet_ids);
- if (!st.ok()) {
- std::unique_lock<bthread::Mutex> lock(mutex);
- cond.notify_one();
- return;
- }
- }
- LOG(INFO) << "close load " << *this
- << ", success_tablet_num=" <<
success_tablet_ids->size()
- << ", failed_tablet_num=" <<
failed_tablet_ids->size();
- std::unique_lock<bthread::Mutex> lock(mutex);
- cond.notify_one();
- });
- if (ret) {
- cond.wait(lock);
- } else {
- return Status::Error<ErrorCode::INTERNAL_ERROR>(
- "there is not enough thread resource for close load");
- }
+ for (auto& [_, index_stream] : _index_streams_map) {
+ RETURN_IF_ERROR(
+ index_stream->close(_tablets_to_commit, success_tablet_ids,
failed_tablet_ids));
}
- return st;
+ LOG(INFO) << "close load " << *this << ", success_tablet_num=" <<
success_tablet_ids->size()
+ << ", failed_tablet_num=" << failed_tablet_ids->size();
+ return Status::OK();
}
void LoadStream::_report_result(StreamId stream, const Status& st,
@@ -424,26 +437,7 @@ Status LoadStream::_append_data(const PStreamHeader&
header, butil::IOBuf* data)
index_stream = it->second;
}
- Status st = Status::OK();
- {
- bthread::Mutex mutex;
- std::unique_lock<bthread::Mutex> lock(mutex);
- bthread::ConditionVariable cond;
- bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(
- [this, &index_stream, &header, &data, &mutex, &cond, &st] {
- signal::set_signal_task_id(_load_id);
- st = index_stream->append_data(header, data);
- std::unique_lock<bthread::Mutex> lock(mutex);
- cond.notify_one();
- });
- if (ret) {
- cond.wait(lock);
- } else {
- return Status::Error<ErrorCode::INTERNAL_ERROR>(
- "there is not enough thread resource for append data");
- }
- }
- return st;
+ return index_stream->append_data(header, data);
}
int LoadStream::on_received_messages(StreamId id, butil::IOBuf* const
messages[], size_t size) {
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index a4d359dc0ea..7e16fe417ca 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -69,6 +69,7 @@ private:
RuntimeProfile::Counter* _append_data_timer = nullptr;
RuntimeProfile::Counter* _add_segment_timer = nullptr;
RuntimeProfile::Counter* _close_wait_timer = nullptr;
+ LoadStreamMgr* _load_stream_mgr = nullptr;
};
using TabletStreamSharedPtr = std::shared_ptr<TabletStream>;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]