This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new d4c1b39d039 [fix](multi table) restrict the multi tables load memory
under high concurrency with a large number of tables (#39992) (#41131)
d4c1b39d039 is described below
commit d4c1b39d039f064d2016d9727ea67ce9d6d482b1
Author: hui lai <[email protected]>
AuthorDate: Tue Sep 24 16:34:32 2024 +0800
[fix](multi table) restrict the multi tables load memory under high
concurrency with a large number of tables (#39992) (#41131)
pick (#39992)
BE node was killed by OOM-killer when use multi table load under high
concurrency with a large number of tables(128 concurrency and every
concurrency load 200 tables).
This pr restricts the multi tables load memory under this issue. If
memory reaches hard limit, new task will be rejected and return
directly.
---
be/src/runtime/exec_env_init.cpp | 4 ++--
.../routine_load/routine_load_task_executor.cpp | 18 ++++++++++++++++--
.../runtime/routine_load/routine_load_task_executor.h | 5 ++++-
be/test/runtime/routine_load_task_executor_test.cpp | 2 +-
4 files changed, 23 insertions(+), 6 deletions(-)
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 5d2ab598b33..84d0684f17d 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -239,7 +239,7 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
new
BrpcClientCache<PFunctionService_Stub>(config::function_service_protocol);
_stream_load_executor = StreamLoadExecutor::create_shared(this);
_routine_load_task_executor = new RoutineLoadTaskExecutor(this);
- RETURN_IF_ERROR(_routine_load_task_executor->init());
+ RETURN_IF_ERROR(_routine_load_task_executor->init(MemInfo::mem_limit()));
_small_file_mgr = new SmallFileMgr(this, config::small_file_dir);
_block_spill_mgr = new BlockSpillManager(store_paths);
_group_commit_mgr = new GroupCommitMgr(this);
@@ -540,7 +540,7 @@ void ExecEnv::init_mem_tracker() {
_s3_file_buffer_tracker =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL,
"S3FileBuffer");
_stream_load_pipe_tracker =
- MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL,
"StreamLoadPipe");
+ MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::LOAD,
"StreamLoadPipe");
}
void ExecEnv::_register_metrics() {
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 9b475ed2133..e12ef7ff6df 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -75,7 +75,8 @@ RoutineLoadTaskExecutor::~RoutineLoadTaskExecutor() {
_task_map.clear();
}
-Status RoutineLoadTaskExecutor::init() {
+Status RoutineLoadTaskExecutor::init(int64_t process_mem_limit) {
+ _load_mem_limit = process_mem_limit *
config::load_process_max_memory_limit_percent / 100;
return ThreadPoolBuilder("routine_load")
.set_min_threads(0)
.set_max_threads(config::max_routine_load_thread_pool_size)
@@ -210,7 +211,7 @@ Status RoutineLoadTaskExecutor::submit_task(const
TRoutineLoadTask& task) {
return Status::OK();
}
- if (_task_map.size() >= config::max_routine_load_thread_pool_size) {
+ if (_task_map.size() >= config::max_routine_load_thread_pool_size ||
_reach_memory_limit()) {
LOG(INFO) << "too many tasks in thread pool. reject task: " <<
UniqueId(task.id)
<< ", job id: " << task.job_id
<< ", queue size: " << _thread_pool->get_queue_size()
@@ -305,6 +306,19 @@ Status RoutineLoadTaskExecutor::submit_task(const
TRoutineLoadTask& task) {
}
}
+bool RoutineLoadTaskExecutor::_reach_memory_limit() {
+ bool is_exceed_soft_mem_limit =
GlobalMemoryArbitrator::is_exceed_soft_mem_limit();
+ auto current_load_mem_value =
+
MemTrackerLimiter::TypeMemSum[MemTrackerLimiter::Type::LOAD]->current_value();
+ if (is_exceed_soft_mem_limit || current_load_mem_value > _load_mem_limit) {
+ LOG(INFO) << "is_exceed_soft_mem_limit: " << is_exceed_soft_mem_limit
+ << " current_load_mem_value: " << current_load_mem_value
+ << " _load_mem_limit: " << _load_mem_limit;
+ return true;
+ }
+ return false;
+}
+
void RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
DataConsumerPool* consumer_pool,
ExecFinishCallback cb) {
#define HANDLE_ERROR(stmt, err_msg) \
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h
b/be/src/runtime/routine_load/routine_load_task_executor.h
index f16ef80ef76..0e597d796c9 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.h
+++ b/be/src/runtime/routine_load/routine_load_task_executor.h
@@ -51,7 +51,7 @@ public:
~RoutineLoadTaskExecutor();
- Status init();
+ Status init(int64_t process_mem_limit);
void stop();
@@ -86,6 +86,7 @@ private:
// create a dummy StreamLoadContext for PKafkaMetaProxyRequest
Status _prepare_ctx(const PKafkaMetaProxyRequest& request,
std::shared_ptr<StreamLoadContext> ctx);
+ bool _reach_memory_limit();
private:
ExecEnv* _exec_env = nullptr;
@@ -95,6 +96,8 @@ private:
std::mutex _lock;
// task id -> load context
std::unordered_map<UniqueId, std::shared_ptr<StreamLoadContext>> _task_map;
+
+ int64_t _load_mem_limit = -1;
};
} // namespace doris
diff --git a/be/test/runtime/routine_load_task_executor_test.cpp
b/be/test/runtime/routine_load_task_executor_test.cpp
index f95fdcfdadf..338b82c6eba 100644
--- a/be/test/runtime/routine_load_task_executor_test.cpp
+++ b/be/test/runtime/routine_load_task_executor_test.cpp
@@ -94,7 +94,7 @@ TEST_F(RoutineLoadTaskExecutorTest, exec_task) {
RoutineLoadTaskExecutor executor(&_env);
Status st;
- st = executor.init();
+ st = executor.init(1024 * 1024);
EXPECT_TRUE(st.ok());
// submit task
st = executor.submit_task(task);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]