This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 18c9ebce951 [improve](move-memtable) tweak load stream flush token num
and max tasks (#28884)
18c9ebce951 is described below
commit 18c9ebce95190394c7f313bb340d1ad96e956dc8
Author: Kaijie Chen <[email protected]>
AuthorDate: Fri Dec 22 20:08:47 2023 +0800
[improve](move-memtable) tweak load stream flush token num and max tasks
(#28884)
---
be/src/common/config.cpp | 2 +-
be/src/runtime/load_stream.cpp | 5 +----
be/src/runtime/load_stream_mgr.cpp | 4 +++-
be/src/runtime/load_stream_mgr.h | 9 +++++++--
4 files changed, 12 insertions(+), 8 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 11921eac8b5..1e22217262b 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -777,7 +777,7 @@ DEFINE_Int32(load_stream_messages_in_batch, "128");
// brpc streaming StreamWait seconds on EAGAIN
DEFINE_Int32(load_stream_eagain_wait_seconds, "60");
// max tasks per flush token in load stream
-DEFINE_Int32(load_stream_flush_token_max_tasks, "2");
+DEFINE_Int32(load_stream_flush_token_max_tasks, "5");
// max send batch parallelism for OlapTableSink
// The value set by the user for send_batch_parallelism is not allowed to
exceed max_send_batch_parallelism_per_job,
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 313728091c1..0c7991bde97 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -46,10 +46,7 @@ namespace doris {
TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id,
LoadStreamMgr* load_stream_mgr, RuntimeProfile*
profile)
: _id(id), _next_segid(0), _load_id(load_id), _txn_id(txn_id) {
- for (int i = 0; i < 10; i++) {
- _flush_tokens.emplace_back(load_stream_mgr->new_token());
- }
-
+ load_stream_mgr->create_tokens(_flush_tokens);
_failed_st = std::make_shared<Status>();
_profile = profile->create_child(fmt::format("TabletStream {}", id), true,
true);
_append_data_timer = ADD_TIMER(_profile, "AppendDataTime");
diff --git a/be/src/runtime/load_stream_mgr.cpp
b/be/src/runtime/load_stream_mgr.cpp
index b3553046aec..8d9d37c5d3a 100644
--- a/be/src/runtime/load_stream_mgr.cpp
+++ b/be/src/runtime/load_stream_mgr.cpp
@@ -34,7 +34,9 @@ namespace doris {
LoadStreamMgr::LoadStreamMgr(uint32_t segment_file_writer_thread_num,
FifoThreadPool* heavy_work_pool, FifoThreadPool*
light_work_pool)
- : _heavy_work_pool(heavy_work_pool), _light_work_pool(light_work_pool)
{
+ : _num_threads(segment_file_writer_thread_num),
+ _heavy_work_pool(heavy_work_pool),
+ _light_work_pool(light_work_pool) {
static_cast<void>(ThreadPoolBuilder("SegmentFileWriterThreadPool")
.set_min_threads(segment_file_writer_thread_num)
.set_max_threads(segment_file_writer_thread_num)
diff --git a/be/src/runtime/load_stream_mgr.h b/be/src/runtime/load_stream_mgr.h
index 466a23c8c5c..ff742012774 100644
--- a/be/src/runtime/load_stream_mgr.h
+++ b/be/src/runtime/load_stream_mgr.h
@@ -41,8 +41,11 @@ public:
Status open_load_stream(const POpenLoadStreamRequest* request,
LoadStreamSharedPtr& load_stream);
void clear_load(UniqueId loadid);
- std::unique_ptr<ThreadPoolToken> new_token() {
- return
_file_writer_thread_pool->new_token(ThreadPool::ExecutionMode::SERIAL);
+ void create_tokens(std::vector<std::unique_ptr<ThreadPoolToken>>& tokens) {
+ for (int i = 0; i < _num_threads * 2; i++) {
+ tokens.push_back(
+
_file_writer_thread_pool->new_token(ThreadPool::ExecutionMode::SERIAL));
+ }
}
// only used by ut
@@ -56,6 +59,8 @@ private:
std::unordered_map<UniqueId, LoadStreamSharedPtr> _load_streams_map;
std::unique_ptr<ThreadPool> _file_writer_thread_pool;
+ uint32_t _num_threads = 0;
+
FifoThreadPool* _heavy_work_pool = nullptr;
FifoThreadPool* _light_work_pool = nullptr;
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]