This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b830f89a7f [improve](move-memtable) avoid using heavy work pool 
during append data (#28745)
3b830f89a7f is described below

commit 3b830f89a7fa212d250baba15547f56524f1d75f
Author: Kaijie Chen <[email protected]>
AuthorDate: Fri Dec 22 22:51:30 2023 +0800

    [improve](move-memtable) avoid using heavy work pool during append data 
(#28745)
---
 be/src/runtime/load_stream.cpp | 112 +++++++++++++++++++----------------------
 be/src/runtime/load_stream.h   |   1 +
 2 files changed, 54 insertions(+), 59 deletions(-)

diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 0c7991bde97..307cd4ef30b 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -45,7 +45,11 @@ namespace doris {
 
 TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id,
                            LoadStreamMgr* load_stream_mgr, RuntimeProfile* 
profile)
-        : _id(id), _next_segid(0), _load_id(load_id), _txn_id(txn_id) {
+        : _id(id),
+          _next_segid(0),
+          _load_id(load_id),
+          _txn_id(txn_id),
+          _load_stream_mgr(load_stream_mgr) {
     load_stream_mgr->create_tokens(_flush_tokens);
     _failed_st = std::make_shared<Status>();
     _profile = profile->create_child(fmt::format("TabletStream {}", id), true, 
true);
@@ -125,6 +129,7 @@ Status TabletStream::append_data(const PStreamHeader& 
header, butil::IOBuf* data
     DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
     butil::IOBuf buf = data->movable();
     auto flush_func = [this, new_segid, eos, buf, header]() {
+        signal::set_signal_task_id(_load_id);
         auto st = _load_stream_writer->append_data(new_segid, header.offset(), 
buf);
         if (eos && st.ok()) {
             st = _load_stream_writer->close_segment(new_segid);
@@ -166,6 +171,7 @@ Status TabletStream::add_segment(const PStreamHeader& 
header, butil::IOBuf* data
     DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
 
     auto add_segment_func = [this, new_segid, stat, flush_schema]() {
+        signal::set_signal_task_id(_load_id);
         auto st = _load_stream_writer->add_segment(new_segid, stat, 
flush_schema);
         if (!st.ok() && _failed_st->ok()) {
             _failed_st = std::make_shared<Status>(st);
@@ -181,13 +187,44 @@ Status TabletStream::add_segment(const PStreamHeader& 
header, butil::IOBuf* data
 
 Status TabletStream::close() {
     SCOPED_TIMER(_close_wait_timer);
-    for (auto& token : _flush_tokens) {
-        token->wait();
+    bthread::Mutex mu;
+    std::unique_lock<bthread::Mutex> lock(mu);
+    bthread::ConditionVariable cv;
+    auto wait_func = [this, &mu, &cv] {
+        signal::set_signal_task_id(_load_id);
+        for (auto& token : _flush_tokens) {
+            token->wait();
+        }
+        std::lock_guard<bthread::Mutex> lock(mu);
+        cv.notify_one();
+    };
+    bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(wait_func);
+    if (ret) {
+        cv.wait(lock);
+    } else {
+        return Status::Error<ErrorCode::INTERNAL_ERROR>(
+                "there is not enough thread resource for close load");
     }
+
     if (!_failed_st->ok()) {
         return *_failed_st;
     }
-    return _load_stream_writer->close();
+
+    Status st = Status::OK();
+    auto close_func = [this, &mu, &cv, &st]() {
+        signal::set_signal_task_id(_load_id);
+        st = _load_stream_writer->close();
+        std::lock_guard<bthread::Mutex> lock(mu);
+        cv.notify_one();
+    };
+    ret = _load_stream_mgr->heavy_work_pool()->try_offer(close_func);
+    if (ret) {
+        cv.wait(lock);
+    } else {
+        return Status::Error<ErrorCode::INTERNAL_ERROR>(
+                "there is not enough thread resource for close load");
+    }
+    return st;
 }
 
 IndexStream::IndexStream(PUniqueId load_id, int64_t id, int64_t txn_id,
@@ -244,13 +281,13 @@ Status IndexStream::close(const std::vector<PTabletID>& 
tablets_to_commit,
         }
     }
 
-    for (auto& it : _tablet_streams_map) {
-        auto st = it.second->close();
+    for (auto& [_, tablet_stream] : _tablet_streams_map) {
+        auto st = tablet_stream->close();
         if (st.ok()) {
-            success_tablet_ids->push_back(it.second->id());
+            success_tablet_ids->push_back(tablet_stream->id());
         } else {
-            LOG(INFO) << "close tablet stream " << *it.second << ", status=" 
<< st;
-            failed_tablet_ids->push_back(it.second->id());
+            LOG(INFO) << "close tablet stream " << *tablet_stream << ", 
status=" << st;
+            failed_tablet_ids->push_back(tablet_stream->id());
         }
     }
     return Status::OK();
@@ -308,37 +345,13 @@ Status LoadStream::close(int64_t src_id, const 
std::vector<PTabletID>& tablets_t
         return Status::OK();
     }
 
-    Status st = Status::OK();
-    {
-        bthread::Mutex mutex;
-        std::unique_lock<bthread::Mutex> lock(mutex);
-        bthread::ConditionVariable cond;
-        bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(
-                [this, &success_tablet_ids, &failed_tablet_ids, &mutex, &cond, 
&st]() {
-                    signal::set_signal_task_id(_load_id);
-                    for (auto& it : _index_streams_map) {
-                        st = it.second->close(_tablets_to_commit, 
success_tablet_ids,
-                                              failed_tablet_ids);
-                        if (!st.ok()) {
-                            std::unique_lock<bthread::Mutex> lock(mutex);
-                            cond.notify_one();
-                            return;
-                        }
-                    }
-                    LOG(INFO) << "close load " << *this
-                              << ", success_tablet_num=" << 
success_tablet_ids->size()
-                              << ", failed_tablet_num=" << 
failed_tablet_ids->size();
-                    std::unique_lock<bthread::Mutex> lock(mutex);
-                    cond.notify_one();
-                });
-        if (ret) {
-            cond.wait(lock);
-        } else {
-            return Status::Error<ErrorCode::INTERNAL_ERROR>(
-                    "there is not enough thread resource for close load");
-        }
+    for (auto& [_, index_stream] : _index_streams_map) {
+        RETURN_IF_ERROR(
+                index_stream->close(_tablets_to_commit, success_tablet_ids, 
failed_tablet_ids));
     }
-    return st;
+    LOG(INFO) << "close load " << *this << ", success_tablet_num=" << 
success_tablet_ids->size()
+              << ", failed_tablet_num=" << failed_tablet_ids->size();
+    return Status::OK();
 }
 
 void LoadStream::_report_result(StreamId stream, const Status& st,
@@ -424,26 +437,7 @@ Status LoadStream::_append_data(const PStreamHeader& 
header, butil::IOBuf* data)
         index_stream = it->second;
     }
 
-    Status st = Status::OK();
-    {
-        bthread::Mutex mutex;
-        std::unique_lock<bthread::Mutex> lock(mutex);
-        bthread::ConditionVariable cond;
-        bool ret = _load_stream_mgr->heavy_work_pool()->try_offer(
-                [this, &index_stream, &header, &data, &mutex, &cond, &st] {
-                    signal::set_signal_task_id(_load_id);
-                    st = index_stream->append_data(header, data);
-                    std::unique_lock<bthread::Mutex> lock(mutex);
-                    cond.notify_one();
-                });
-        if (ret) {
-            cond.wait(lock);
-        } else {
-            return Status::Error<ErrorCode::INTERNAL_ERROR>(
-                    "there is not enough thread resource for append data");
-        }
-    }
-    return st;
+    return index_stream->append_data(header, data);
 }
 
 int LoadStream::on_received_messages(StreamId id, butil::IOBuf* const 
messages[], size_t size) {
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index a4d359dc0ea..7e16fe417ca 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -69,6 +69,7 @@ private:
     RuntimeProfile::Counter* _append_data_timer = nullptr;
     RuntimeProfile::Counter* _add_segment_timer = nullptr;
     RuntimeProfile::Counter* _close_wait_timer = nullptr;
+    LoadStreamMgr* _load_stream_mgr = nullptr;
 };
 
 using TabletStreamSharedPtr = std::shared_ptr<TabletStream>;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to