This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 128752b  [Routine load] Fix kafka load too many task bug (#5327)
128752b is described below

commit 128752b4f9624685adb4cbf1f6a3e79b47c1eb5a
Author: wyb <[email protected]>
AuthorDate: Wed Feb 3 13:23:30 2021 +0800

    [Routine load] Fix kafka load too many task bug (#5327)
---
 be/src/runtime/routine_load/routine_load_task_executor.cpp | 9 +++++----
 be/test/runtime/routine_load_task_executor_test.cpp        | 3 +++
 2 files changed, 8 insertions(+), 4 deletions(-)

diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp 
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 74a0aa0..a54f657 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -37,7 +37,8 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(routine_load_task_count, 
MetricUnit::NOUNIT);
 
 RoutineLoadTaskExecutor::RoutineLoadTaskExecutor(ExecEnv* exec_env)
         : _exec_env(exec_env),
-          _thread_pool(config::routine_load_thread_pool_size, 1),
+          _thread_pool(config::routine_load_thread_pool_size,
+                       config::routine_load_thread_pool_size),
           _data_consumer_pool(10) {
     REGISTER_HOOK_METRIC(routine_load_task_count, [this]() {
         std::lock_guard<std::mutex> l(_lock);
@@ -105,11 +106,11 @@ Status RoutineLoadTaskExecutor::submit_task(const 
TRoutineLoadTask& task) {
         return Status::OK();
     }
 
-    // thread pool's queue size > 0 means there are tasks waiting to be 
executed, so no more tasks should be submitted.
-    if (_thread_pool.get_queue_size() > 0) {
+    if (_task_map.size() >= config::routine_load_thread_pool_size) {
         LOG(INFO) << "too many tasks in thread pool. reject task: " << 
UniqueId(task.id)
                   << ", job id: " << task.job_id
-                  << ", queue size: " << _thread_pool.get_queue_size();
+                  << ", queue size: " << _thread_pool.get_queue_size()
+                  << ", current tasks num: " << _task_map.size();
         return Status::TooManyTasks(UniqueId(task.id).to_string());
     }
 
diff --git a/be/test/runtime/routine_load_task_executor_test.cpp 
b/be/test/runtime/routine_load_task_executor_test.cpp
index 31e01a7..384a859 100644
--- a/be/test/runtime/routine_load_task_executor_test.cpp
+++ b/be/test/runtime/routine_load_task_executor_test.cpp
@@ -51,6 +51,9 @@ public:
         _env._master_info = new TMasterInfo();
         _env._load_stream_mgr = new LoadStreamMgr();
         _env._stream_load_executor = new StreamLoadExecutor(&_env);
+
+        config::routine_load_thread_pool_size = 5;
+        config::max_consumer_num_per_group = 3;
     }
 
     void TearDown() override {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to