This is an automated email from the ASF dual-hosted git repository.
yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 128752b [Routine load] Fix kafka load too many task bug (#5327)
128752b is described below
commit 128752b4f9624685adb4cbf1f6a3e79b47c1eb5a
Author: wyb <[email protected]>
AuthorDate: Wed Feb 3 13:23:30 2021 +0800
[Routine load] Fix kafka load too many task bug (#5327)
---
be/src/runtime/routine_load/routine_load_task_executor.cpp | 9 +++++----
be/test/runtime/routine_load_task_executor_test.cpp | 3 +++
2 files changed, 8 insertions(+), 4 deletions(-)
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 74a0aa0..a54f657 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -37,7 +37,8 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(routine_load_task_count,
MetricUnit::NOUNIT);
RoutineLoadTaskExecutor::RoutineLoadTaskExecutor(ExecEnv* exec_env)
: _exec_env(exec_env),
- _thread_pool(config::routine_load_thread_pool_size, 1),
+ _thread_pool(config::routine_load_thread_pool_size,
+ config::routine_load_thread_pool_size),
_data_consumer_pool(10) {
REGISTER_HOOK_METRIC(routine_load_task_count, [this]() {
std::lock_guard<std::mutex> l(_lock);
@@ -105,11 +106,11 @@ Status RoutineLoadTaskExecutor::submit_task(const
TRoutineLoadTask& task) {
return Status::OK();
}
- // thread pool's queue size > 0 means there are tasks waiting to be
executed, so no more tasks should be submitted.
- if (_thread_pool.get_queue_size() > 0) {
+ if (_task_map.size() >= config::routine_load_thread_pool_size) {
LOG(INFO) << "too many tasks in thread pool. reject task: " <<
UniqueId(task.id)
<< ", job id: " << task.job_id
- << ", queue size: " << _thread_pool.get_queue_size();
+ << ", queue size: " << _thread_pool.get_queue_size()
+ << ", current tasks num: " << _task_map.size();
return Status::TooManyTasks(UniqueId(task.id).to_string());
}
diff --git a/be/test/runtime/routine_load_task_executor_test.cpp
b/be/test/runtime/routine_load_task_executor_test.cpp
index 31e01a7..384a859 100644
--- a/be/test/runtime/routine_load_task_executor_test.cpp
+++ b/be/test/runtime/routine_load_task_executor_test.cpp
@@ -51,6 +51,9 @@ public:
_env._master_info = new TMasterInfo();
_env._load_stream_mgr = new LoadStreamMgr();
_env._stream_load_executor = new StreamLoadExecutor(&_env);
+
+ config::routine_load_thread_pool_size = 5;
+ config::max_consumer_num_per_group = 3;
}
void TearDown() override {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]