This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 18c9ebce951 [improve](move-memtable) tweak load stream flush token num 
and max tasks (#28884)
18c9ebce951 is described below

commit 18c9ebce95190394c7f313bb340d1ad96e956dc8
Author: Kaijie Chen <[email protected]>
AuthorDate: Fri Dec 22 20:08:47 2023 +0800

    [improve](move-memtable) tweak load stream flush token num and max tasks 
(#28884)
---
 be/src/common/config.cpp           | 2 +-
 be/src/runtime/load_stream.cpp     | 5 +----
 be/src/runtime/load_stream_mgr.cpp | 4 +++-
 be/src/runtime/load_stream_mgr.h   | 9 +++++++--
 4 files changed, 12 insertions(+), 8 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 11921eac8b5..1e22217262b 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -777,7 +777,7 @@ DEFINE_Int32(load_stream_messages_in_batch, "128");
 // brpc streaming StreamWait seconds on EAGAIN
 DEFINE_Int32(load_stream_eagain_wait_seconds, "60");
 // max tasks per flush token in load stream
-DEFINE_Int32(load_stream_flush_token_max_tasks, "2");
+DEFINE_Int32(load_stream_flush_token_max_tasks, "5");
 
 // max send batch parallelism for OlapTableSink
 // The value set by the user for send_batch_parallelism is not allowed to 
exceed max_send_batch_parallelism_per_job,
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 313728091c1..0c7991bde97 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -46,10 +46,7 @@ namespace doris {
 TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id,
                            LoadStreamMgr* load_stream_mgr, RuntimeProfile* 
profile)
         : _id(id), _next_segid(0), _load_id(load_id), _txn_id(txn_id) {
-    for (int i = 0; i < 10; i++) {
-        _flush_tokens.emplace_back(load_stream_mgr->new_token());
-    }
-
+    load_stream_mgr->create_tokens(_flush_tokens);
     _failed_st = std::make_shared<Status>();
     _profile = profile->create_child(fmt::format("TabletStream {}", id), true, 
true);
     _append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
diff --git a/be/src/runtime/load_stream_mgr.cpp 
b/be/src/runtime/load_stream_mgr.cpp
index b3553046aec..8d9d37c5d3a 100644
--- a/be/src/runtime/load_stream_mgr.cpp
+++ b/be/src/runtime/load_stream_mgr.cpp
@@ -34,7 +34,9 @@ namespace doris {
 
 LoadStreamMgr::LoadStreamMgr(uint32_t segment_file_writer_thread_num,
                              FifoThreadPool* heavy_work_pool, FifoThreadPool* 
light_work_pool)
-        : _heavy_work_pool(heavy_work_pool), _light_work_pool(light_work_pool) 
{
+        : _num_threads(segment_file_writer_thread_num),
+          _heavy_work_pool(heavy_work_pool),
+          _light_work_pool(light_work_pool) {
     static_cast<void>(ThreadPoolBuilder("SegmentFileWriterThreadPool")
                               .set_min_threads(segment_file_writer_thread_num)
                               .set_max_threads(segment_file_writer_thread_num)
diff --git a/be/src/runtime/load_stream_mgr.h b/be/src/runtime/load_stream_mgr.h
index 466a23c8c5c..ff742012774 100644
--- a/be/src/runtime/load_stream_mgr.h
+++ b/be/src/runtime/load_stream_mgr.h
@@ -41,8 +41,11 @@ public:
     Status open_load_stream(const POpenLoadStreamRequest* request,
                             LoadStreamSharedPtr& load_stream);
     void clear_load(UniqueId loadid);
-    std::unique_ptr<ThreadPoolToken> new_token() {
-        return 
_file_writer_thread_pool->new_token(ThreadPool::ExecutionMode::SERIAL);
+    void create_tokens(std::vector<std::unique_ptr<ThreadPoolToken>>& tokens) {
+        for (int i = 0; i < _num_threads * 2; i++) {
+            tokens.push_back(
+                    
_file_writer_thread_pool->new_token(ThreadPool::ExecutionMode::SERIAL));
+        }
     }
 
     // only used by ut
@@ -56,6 +59,8 @@ private:
     std::unordered_map<UniqueId, LoadStreamSharedPtr> _load_streams_map;
     std::unique_ptr<ThreadPool> _file_writer_thread_pool;
 
+    uint32_t _num_threads = 0;
+
     FifoThreadPool* _heavy_work_pool = nullptr;
     FifoThreadPool* _light_work_pool = nullptr;
 };


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to