This is an automated email from the ASF dual-hosted git repository.
liaoxin pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 1a248952578 [opt](routine-load) optimize routine load task thread
pool and related param(#32282) (#34896)
1a248952578 is described below
commit 1a2489525780136a921fde22cb47682d05cc24e0
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Wed May 15 12:42:02 2024 +0800
[opt](routine-load) optimize routine load task thread pool and related
param(#32282) (#34896)
---
be/src/common/config.cpp | 4 ++--
be/src/common/config.h | 4 ++--
be/src/runtime/exec_env_init.cpp | 1 +
.../routine_load/routine_load_task_executor.cpp | 24 ++++++++++++++--------
.../routine_load/routine_load_task_executor.h | 6 ++++--
.../runtime/routine_load_task_executor_test.cpp | 6 ++++--
.../main/java/org/apache/doris/common/Config.java | 4 ++--
7 files changed, 30 insertions(+), 19 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 432b0bcb03e..ebb30e9d8e1 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -615,9 +615,9 @@ DEFINE_Bool(enable_metric_calculator, "true");
// max consumer num in one data consumer group, for routine load
DEFINE_mInt32(max_consumer_num_per_group, "3");
-// the size of thread pool for routine load task.
+// the max size of thread pool for routine load task.
// this should be larger than FE config 'max_routine_load_task_num_per_be'
(default 5)
-DEFINE_Int32(routine_load_thread_pool_size, "10");
+DEFINE_Int32(max_routine_load_thread_pool_size, "1024");
// max external scan cache batch count, means cache
max_memory_cache_batch_count * batch_size row
// default is 20, batch_size's default value is 1024 means 20 * 1024 rows will
be cached
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 403454abbb1..6d22ec483cb 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -668,9 +668,9 @@ DECLARE_Bool(enable_metric_calculator);
// max consumer num in one data consumer group, for routine load
DECLARE_mInt32(max_consumer_num_per_group);
-// the size of thread pool for routine load task.
+// the max size of thread pool for routine load task.
// this should be larger than FE config 'max_routine_load_task_num_per_be'
(default 5)
-DECLARE_Int32(routine_load_thread_pool_size);
+DECLARE_Int32(max_routine_load_thread_pool_size);
// max external scan cache batch count, means cache
max_memory_cache_batch_count * batch_size row
// default is 20, batch_size's default value is 1024 means 20 * 1024 rows will
be cached
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 2757ed456ff..9c1fa426345 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -228,6 +228,7 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
_function_client_cache = new BrpcClientCache<PFunctionService_Stub>();
_stream_load_executor = StreamLoadExecutor::create_shared(this);
_routine_load_task_executor = new RoutineLoadTaskExecutor(this);
+ RETURN_IF_ERROR(_routine_load_task_executor->init());
_small_file_mgr = new SmallFileMgr(this, config::small_file_dir);
_block_spill_mgr = new BlockSpillManager(store_paths);
_group_commit_mgr = new GroupCommitMgr(this);
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 0534531aed2..42f5db72fdb 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -62,10 +62,7 @@ using namespace ErrorCode;
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(routine_load_task_count,
MetricUnit::NOUNIT);
RoutineLoadTaskExecutor::RoutineLoadTaskExecutor(ExecEnv* exec_env)
- : _exec_env(exec_env),
- _thread_pool(config::routine_load_thread_pool_size,
config::routine_load_thread_pool_size,
- "routine_load"),
- _data_consumer_pool(config::routine_load_consumer_pool_size) {
+ : _exec_env(exec_env),
_data_consumer_pool(config::routine_load_consumer_pool_size) {
REGISTER_HOOK_METRIC(routine_load_task_count, [this]() {
// std::lock_guard<std::mutex> l(_lock);
return _task_map.size();
@@ -79,10 +76,19 @@ RoutineLoadTaskExecutor::~RoutineLoadTaskExecutor() {
_task_map.clear();
}
+Status RoutineLoadTaskExecutor::init() {
+ return ThreadPoolBuilder("routine_load")
+ .set_min_threads(0)
+ .set_max_threads(config::max_routine_load_thread_pool_size)
+ .set_max_queue_size(config::max_routine_load_thread_pool_size)
+ .build(&_thread_pool);
+}
+
void RoutineLoadTaskExecutor::stop() {
DEREGISTER_HOOK_METRIC(routine_load_task_count);
- _thread_pool.shutdown();
- _thread_pool.join();
+ if (_thread_pool) {
+ _thread_pool->shutdown();
+ }
_data_consumer_pool.stop();
}
@@ -205,10 +211,10 @@ Status RoutineLoadTaskExecutor::submit_task(const
TRoutineLoadTask& task) {
return Status::OK();
}
- if (_task_map.size() >= config::routine_load_thread_pool_size) {
+ if (_task_map.size() >= config::max_routine_load_thread_pool_size) {
LOG(INFO) << "too many tasks in thread pool. reject task: " <<
UniqueId(task.id)
<< ", job id: " << task.job_id
- << ", queue size: " << _thread_pool.get_queue_size()
+ << ", queue size: " << _thread_pool->get_queue_size()
<< ", current tasks num: " << _task_map.size();
return Status::TooManyTasks("{}_{}", UniqueId(task.id).to_string(),
BackendOptions::get_localhost());
@@ -278,7 +284,7 @@ Status RoutineLoadTaskExecutor::submit_task(const
TRoutineLoadTask& task) {
_task_map[ctx->id] = ctx;
// offer the task to thread pool
- if (!_thread_pool.offer(std::bind<void>(
+ if (!_thread_pool->submit_func(std::bind<void>(
&RoutineLoadTaskExecutor::exec_task, this, ctx,
&_data_consumer_pool,
[this](std::shared_ptr<StreamLoadContext> ctx) {
std::unique_lock<std::mutex> l(_lock);
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h
b/be/src/runtime/routine_load/routine_load_task_executor.h
index 8f8a6f2d653..f16ef80ef76 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.h
+++ b/be/src/runtime/routine_load/routine_load_task_executor.h
@@ -27,8 +27,8 @@
#include <vector>
#include "runtime/routine_load/data_consumer_pool.h"
+#include "util/threadpool.h"
#include "util/uid_util.h"
-#include "util/work_thread_pool.hpp"
namespace doris {
@@ -51,6 +51,8 @@ public:
~RoutineLoadTaskExecutor();
+ Status init();
+
void stop();
// submit a routine load task
@@ -87,7 +89,7 @@ private:
private:
ExecEnv* _exec_env = nullptr;
- PriorityThreadPool _thread_pool;
+ std::unique_ptr<ThreadPool> _thread_pool;
DataConsumerPool _data_consumer_pool;
std::mutex _lock;
diff --git a/be/test/runtime/routine_load_task_executor_test.cpp
b/be/test/runtime/routine_load_task_executor_test.cpp
index 8a8dcc4d677..f95fdcfdadf 100644
--- a/be/test/runtime/routine_load_task_executor_test.cpp
+++ b/be/test/runtime/routine_load_task_executor_test.cpp
@@ -59,7 +59,7 @@ public:
_env.set_new_load_stream_mgr(NewLoadStreamMgr::create_unique());
_env.set_stream_load_executor(StreamLoadExecutor::create_unique(&_env));
- config::routine_load_thread_pool_size = 5;
+ config::max_routine_load_thread_pool_size = 1024;
config::max_consumer_num_per_group = 3;
}
@@ -93,8 +93,10 @@ TEST_F(RoutineLoadTaskExecutorTest, exec_task) {
task.__set_kafka_load_info(k_info);
RoutineLoadTaskExecutor executor(&_env);
- // submit task
Status st;
+ st = executor.init();
+ EXPECT_TRUE(st.ok());
+ // submit task
st = executor.submit_task(task);
EXPECT_TRUE(st.ok());
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 26708488ad7..6fc20578ec0 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1150,8 +1150,8 @@ public class Config extends ConfigBase {
/**
* the max concurrent routine load task num per BE.
* This is to limit the num of routine load tasks sending to a BE, and it
should also less
- * than BE config 'routine_load_thread_pool_size'(default 10),
- * which is the routine load task thread pool size on BE.
+ * than BE config 'max_routine_load_thread_pool_size'(default 1024),
+ * which is the routine load task thread pool max size on BE.
*/
@ConfField(mutable = true, masterOnly = true)
public static int max_routine_load_task_num_per_be = 5;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]