This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 08a1affde45 [fix](multi table) restrict the multi tables load memory 
under high concurrency with a large number of tables (#39992)
08a1affde45 is described below

commit 08a1affde4520dd8ac711363c44ffb2524d3739a
Author: hui lai <[email protected]>
AuthorDate: Fri Aug 30 12:23:35 2024 +0800

    [fix](multi table) restrict the multi tables load memory under high 
concurrency with a large number of tables (#39992)
    
    BE node was killed by OOM-killer when use multi table load under high
    concurrency with a large number of tables(128 concurrency and every
    concurrency load 200 tables).
    
    This pr restricts the multi tables load memory under this issue. If
    memory reaches hard limit, new task will be rejected and return
    directly.
---
 be/src/runtime/exec_env_init.cpp                       |  4 ++--
 .../routine_load/routine_load_task_executor.cpp        | 18 ++++++++++++++++--
 .../runtime/routine_load/routine_load_task_executor.h  |  5 ++++-
 be/test/runtime/routine_load_task_executor_test.cpp    |  2 +-
 4 files changed, 23 insertions(+), 6 deletions(-)

diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 53fe1993139..674d5ee5115 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -298,7 +298,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
         _stream_load_executor = StreamLoadExecutor::create_shared(this);
     }
     _routine_load_task_executor = new RoutineLoadTaskExecutor(this);
-    RETURN_IF_ERROR(_routine_load_task_executor->init());
+    RETURN_IF_ERROR(_routine_load_task_executor->init(MemInfo::mem_limit()));
     _small_file_mgr = new SmallFileMgr(this, config::small_file_dir);
     _group_commit_mgr = new GroupCommitMgr(this);
     _memtable_memory_limiter = std::make_unique<MemTableMemoryLimiter>();
@@ -607,7 +607,7 @@ void ExecEnv::init_mem_tracker() {
     _s3_file_buffer_tracker =
             MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, 
"S3FileBuffer");
     _stream_load_pipe_tracker =
-            MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, 
"StreamLoadPipe");
+            MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::LOAD, 
"StreamLoadPipe");
 }
 
 void ExecEnv::_register_metrics() {
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp 
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 958ad37f8d2..b63495df837 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -75,7 +75,8 @@ RoutineLoadTaskExecutor::~RoutineLoadTaskExecutor() {
     _task_map.clear();
 }
 
-Status RoutineLoadTaskExecutor::init() {
+Status RoutineLoadTaskExecutor::init(int64_t process_mem_limit) {
+    _load_mem_limit = process_mem_limit * 
config::load_process_max_memory_limit_percent / 100;
     return ThreadPoolBuilder("routine_load")
             .set_min_threads(0)
             .set_max_threads(config::max_routine_load_thread_pool_size)
@@ -210,7 +211,7 @@ Status RoutineLoadTaskExecutor::submit_task(const 
TRoutineLoadTask& task) {
         return Status::OK();
     }
 
-    if (_task_map.size() >= config::max_routine_load_thread_pool_size) {
+    if (_task_map.size() >= config::max_routine_load_thread_pool_size || 
_reach_memory_limit()) {
         LOG(INFO) << "too many tasks in thread pool. reject task: " << 
UniqueId(task.id)
                   << ", job id: " << task.job_id
                   << ", queue size: " << _thread_pool->get_queue_size()
@@ -311,6 +312,19 @@ Status RoutineLoadTaskExecutor::submit_task(const 
TRoutineLoadTask& task) {
     }
 }
 
+bool RoutineLoadTaskExecutor::_reach_memory_limit() {
+    bool is_exceed_soft_mem_limit = 
GlobalMemoryArbitrator::is_exceed_soft_mem_limit();
+    auto current_load_mem_value =
+            
MemTrackerLimiter::TypeMemSum[MemTrackerLimiter::Type::LOAD]->current_value();
+    if (is_exceed_soft_mem_limit || current_load_mem_value > _load_mem_limit) {
+        LOG(INFO) << "is_exceed_soft_mem_limit: " << is_exceed_soft_mem_limit
+                  << " current_load_mem_value: " << current_load_mem_value
+                  << " _load_mem_limit: " << _load_mem_limit;
+        return true;
+    }
+    return false;
+}
+
 void RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
                                         DataConsumerPool* consumer_pool, 
ExecFinishCallback cb) {
 #define HANDLE_ERROR(stmt, err_msg)                                        \
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h 
b/be/src/runtime/routine_load/routine_load_task_executor.h
index f16ef80ef76..0e597d796c9 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.h
+++ b/be/src/runtime/routine_load/routine_load_task_executor.h
@@ -51,7 +51,7 @@ public:
 
     ~RoutineLoadTaskExecutor();
 
-    Status init();
+    Status init(int64_t process_mem_limit);
 
     void stop();
 
@@ -86,6 +86,7 @@ private:
     // create a dummy StreamLoadContext for PKafkaMetaProxyRequest
     Status _prepare_ctx(const PKafkaMetaProxyRequest& request,
                         std::shared_ptr<StreamLoadContext> ctx);
+    bool _reach_memory_limit();
 
 private:
     ExecEnv* _exec_env = nullptr;
@@ -95,6 +96,8 @@ private:
     std::mutex _lock;
     // task id -> load context
     std::unordered_map<UniqueId, std::shared_ptr<StreamLoadContext>> _task_map;
+
+    int64_t _load_mem_limit = -1;
 };
 
 } // namespace doris
diff --git a/be/test/runtime/routine_load_task_executor_test.cpp 
b/be/test/runtime/routine_load_task_executor_test.cpp
index f95fdcfdadf..338b82c6eba 100644
--- a/be/test/runtime/routine_load_task_executor_test.cpp
+++ b/be/test/runtime/routine_load_task_executor_test.cpp
@@ -94,7 +94,7 @@ TEST_F(RoutineLoadTaskExecutorTest, exec_task) {
 
     RoutineLoadTaskExecutor executor(&_env);
     Status st;
-    st = executor.init();
+    st = executor.init(1024 * 1024);
     EXPECT_TRUE(st.ok());
     // submit task
     st = executor.submit_task(task);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to