This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b633fbe899a [enhance](memtable) support dynamic modification of flush
thread pool size (#60423)
b633fbe899a is described below
commit b633fbe899abbd614ff4da19b59927c7d598d194
Author: hui lai <[email protected]>
AuthorDate: Thu Feb 5 13:02:11 2026 +0800
[enhance](memtable) support dynamic modification of flush thread pool size
(#60423)
### What problem does this PR solve?
Support dynamic modification of flush thread pool size.
Usage Example:
```
# Update flush thread pool size without restarting BE
curl
"http://be_host:webserver_port/api/update_config?flush_thread_num_per_store=10"
curl
"http://be_host:webserver_port/api/update_config?max_flush_thread_num_per_cpu=8"
```
---
be/src/common/config.cpp | 26 ++++-
be/src/common/config.h | 6 +-
be/src/olap/memtable_flush_executor.cpp | 29 ++++-
be/src/olap/memtable_flush_executor.h | 3 +
be/src/runtime/workload_group/workload_group.cpp | 29 +++++
be/src/runtime/workload_group/workload_group.h | 3 +
.../workload_group/workload_group_manager.cpp | 7 ++
.../workload_group/workload_group_manager.h | 2 +
be/test/olap/memtable_flush_executor_test.cpp | 123 +++++++++++++++++++++
9 files changed, 217 insertions(+), 11 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index ac90587173e..1648c502f09 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -45,6 +45,10 @@
#include "common/status.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
+#include "olap/memtable_flush_executor.h"
+#include "olap/storage_engine.h"
+#include "runtime/exec_env.h"
+#include "runtime/workload_group/workload_group_manager.h"
#include "util/cpu_info.h"
#include "util/string_util.h"
@@ -813,12 +817,12 @@ DEFINE_mInt32(storage_flood_stage_usage_percent, "90");
// 90%
// The min bytes that should be left of a data dir
DEFINE_mInt64(storage_flood_stage_left_capacity_bytes, "1073741824"); // 1GB
// number of thread for flushing memtable per store
-DEFINE_Int32(flush_thread_num_per_store, "6");
+DEFINE_mInt32(flush_thread_num_per_store, "6");
// number of thread for flushing memtable per store, for high priority load
task
-DEFINE_Int32(high_priority_flush_thread_num_per_store, "6");
+DEFINE_mInt32(high_priority_flush_thread_num_per_store, "6");
// number of threads = min(flush_thread_num_per_store * num_store,
// max_flush_thread_num_per_cpu * num_cpu)
-DEFINE_Int32(max_flush_thread_num_per_cpu, "4");
+DEFINE_mInt32(max_flush_thread_num_per_cpu, "4");
// config for tablet meta checkpoint
DEFINE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num, "10");
@@ -2100,6 +2104,22 @@ void update_config(const std::string& field, const
std::string& value) {
if ("sys_log_level" == field) {
// update log level
update_logging(field, value);
+ } else if ("flush_thread_num_per_store" == field ||
+ "high_priority_flush_thread_num_per_store" == field ||
+ "max_flush_thread_num_per_cpu" == field) {
+ // update memtable flush thread pool size
+ auto* exec_env = ExecEnv::GetInstance();
+ if (exec_env != nullptr) {
+ auto* flush_executor =
exec_env->storage_engine().memtable_flush_executor();
+ if (flush_executor != nullptr) {
+ flush_executor->update_memtable_flush_threads();
+ }
+ // update workload groups' memtable flush thread pools
+ auto* wg_mgr = exec_env->workload_group_mgr();
+ if (wg_mgr != nullptr) {
+ wg_mgr->update_memtable_flush_threads();
+ }
+ }
}
}
diff --git a/be/src/common/config.h b/be/src/common/config.h
index ab7b103ac2a..4e1ce9c96fc 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -851,12 +851,12 @@ DECLARE_mInt32(storage_flood_stage_usage_percent); // 90%
// The min bytes that should be left of a data dir
DECLARE_mInt64(storage_flood_stage_left_capacity_bytes); // 1GB
// number of thread for flushing memtable per store
-DECLARE_Int32(flush_thread_num_per_store);
+DECLARE_mInt32(flush_thread_num_per_store);
// number of thread for flushing memtable per store, for high priority load
task
-DECLARE_Int32(high_priority_flush_thread_num_per_store);
+DECLARE_mInt32(high_priority_flush_thread_num_per_store);
// number of threads = min(flush_thread_num_per_store * num_store,
// max_flush_thread_num_per_cpu * num_cpu)
-DECLARE_Int32(max_flush_thread_num_per_cpu);
+DECLARE_mInt32(max_flush_thread_num_per_cpu);
// config for tablet meta checkpoint
DECLARE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num);
diff --git a/be/src/olap/memtable_flush_executor.cpp
b/be/src/olap/memtable_flush_executor.cpp
index f2c2b764644..cdc67ab32b8 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -294,11 +294,11 @@ void
FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable_ptr, int32_t
}
void MemTableFlushExecutor::init(int num_disk) {
- num_disk = std::max(1, num_disk);
+ _num_disk = std::max(1, num_disk);
int num_cpus = std::thread::hardware_concurrency();
int min_threads = std::max(1, config::flush_thread_num_per_store);
- int max_threads = num_cpus == 0 ? num_disk * min_threads
- : std::min(num_disk * min_threads,
+ int max_threads = num_cpus == 0 ? _num_disk * min_threads
+ : std::min(_num_disk * min_threads,
num_cpus *
config::max_flush_thread_num_per_cpu);
static_cast<void>(ThreadPoolBuilder("MemTableFlushThreadPool")
.set_min_threads(min_threads)
@@ -306,8 +306,8 @@ void MemTableFlushExecutor::init(int num_disk) {
.build(&_flush_pool));
min_threads = std::max(1,
config::high_priority_flush_thread_num_per_store);
- max_threads = num_cpus == 0 ? num_disk * min_threads
- : std::min(num_disk * min_threads,
+ max_threads = num_cpus == 0 ? _num_disk * min_threads
+ : std::min(_num_disk * min_threads,
num_cpus *
config::max_flush_thread_num_per_cpu);
static_cast<void>(ThreadPoolBuilder("MemTableHighPriorityFlushThreadPool")
.set_min_threads(min_threads)
@@ -315,6 +315,25 @@ void MemTableFlushExecutor::init(int num_disk) {
.build(&_high_prio_flush_pool));
}
+void MemTableFlushExecutor::update_memtable_flush_threads() {
+ int num_cpus = std::thread::hardware_concurrency();
+ int min_threads = std::max(1, config::flush_thread_num_per_store);
+ int max_threads = num_cpus == 0 ? _num_disk * min_threads
+ : std::min(_num_disk * min_threads,
+ num_cpus *
config::max_flush_thread_num_per_cpu);
+ // Update max_threads first to avoid constraint violation when increasing
min_threads
+ static_cast<void>(_flush_pool->set_max_threads(max_threads));
+ static_cast<void>(_flush_pool->set_min_threads(min_threads));
+
+ min_threads = std::max(1,
config::high_priority_flush_thread_num_per_store);
+ max_threads = num_cpus == 0 ? _num_disk * min_threads
+ : std::min(_num_disk * min_threads,
+ num_cpus *
config::max_flush_thread_num_per_cpu);
+ // Update max_threads first to avoid constraint violation when increasing
min_threads
+ static_cast<void>(_high_prio_flush_pool->set_max_threads(max_threads));
+ static_cast<void>(_high_prio_flush_pool->set_min_threads(min_threads));
+}
+
// NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are
flushed in order.
Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>&
flush_token,
std::shared_ptr<RowsetWriter>
rowset_writer,
diff --git a/be/src/olap/memtable_flush_executor.h
b/be/src/olap/memtable_flush_executor.h
index ecc062f1948..869fb801ccb 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -163,10 +163,13 @@ public:
ThreadPool* flush_pool() { return _flush_pool.get(); }
+ void update_memtable_flush_threads();
+
private:
std::unique_ptr<ThreadPool> _flush_pool;
std::unique_ptr<ThreadPool> _high_prio_flush_pool;
std::atomic<int> _flushing_task_count = 0;
+ int _num_disk = 0;
};
} // namespace doris
diff --git a/be/src/runtime/workload_group/workload_group.cpp
b/be/src/runtime/workload_group/workload_group.cpp
index 9b507bbd537..bdb48b25874 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -26,8 +26,11 @@
#include <ostream>
#include <utility>
+#include "cloud/config.h"
+#include "common/config.h"
#include "common/logging.h"
#include "exec/schema_scanner/schema_scanner_helper.h"
+#include "io/cache/block_file_cache_factory.h"
#include "io/fs/local_file_reader.h"
#include "olap/storage_engine.h"
#include "pipeline/task_queue.h"
@@ -748,6 +751,32 @@ void WorkloadGroup::try_stop_schedulers() {
}
}
+void WorkloadGroup::update_memtable_flush_threads() {
+ if (_memtable_flush_pool == nullptr) {
+ return;
+ }
+
+ int num_disk = 1;
+ int num_cpus = 0;
+#ifndef BE_TEST
+ if (config::is_cloud_mode()) {
+ num_disk =
cast_set<int>(io::FileCacheFactory::instance()->get_cache_instance_size());
+ } else {
+ num_disk = ExecEnv::GetInstance()->storage_engine().get_disk_num();
+ }
+ num_cpus = std::thread::hardware_concurrency();
+#endif
+ num_disk = std::max(1, num_disk);
+ int min_threads = std::max(1, config::flush_thread_num_per_store);
+ int max_threads = num_cpus == 0 ? num_disk * min_threads
+ : std::min(num_disk * min_threads,
+ num_cpus *
config::max_flush_thread_num_per_cpu);
+
+ // Update max_threads first to avoid constraint violation when increasing
min_threads
+ static_cast<void>(_memtable_flush_pool->set_max_threads(max_threads));
+ static_cast<void>(_memtable_flush_pool->set_min_threads(min_threads));
+}
+
#include "common/compile_check_end.h"
} // namespace doris
diff --git a/be/src/runtime/workload_group/workload_group.h
b/be/src/runtime/workload_group/workload_group.h
index 6a563cc9a68..d398c7fac0a 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -193,6 +193,9 @@ public:
// to avoid lock competition with the workload thread pool's update
return _memtable_flush_pool.get();
}
+
+ void update_memtable_flush_threads();
+
void create_cgroup_cpu_ctl();
std::weak_ptr<CgroupCpuCtl> get_cgroup_cpu_ctl_wptr();
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index ef8d3534eb6..d226120a1ad 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -278,6 +278,13 @@ void WorkloadGroupMgr::refresh_workload_group_metrics() {
}
}
+void WorkloadGroupMgr::update_memtable_flush_threads() {
+ std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+ for (const auto& [id, wg] : _workload_groups) {
+ wg->update_memtable_flush_threads();
+ }
+}
+
void WorkloadGroupMgr::add_paused_query(const
std::shared_ptr<ResourceContext>& resource_ctx,
int64_t reserve_size, const Status&
status) {
DCHECK(resource_ctx != nullptr);
diff --git a/be/src/runtime/workload_group/workload_group_manager.h
b/be/src/runtime/workload_group/workload_group_manager.h
index 20f8a926148..016b6f73a56 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -89,6 +89,8 @@ public:
void refresh_workload_group_metrics();
+ void update_memtable_flush_threads();
+
MOCK_FUNCTION void add_paused_query(const
std::shared_ptr<ResourceContext>& resource_ctx,
int64_t reserve_size, const Status&
status);
diff --git a/be/test/olap/memtable_flush_executor_test.cpp
b/be/test/olap/memtable_flush_executor_test.cpp
index 687511a47d1..a3ab561c862 100644
--- a/be/test/olap/memtable_flush_executor_test.cpp
+++ b/be/test/olap/memtable_flush_executor_test.cpp
@@ -21,7 +21,9 @@
#include <sys/file.h>
#include <string>
+#include <thread>
+#include "common/config.h"
#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/PaloInternalService_types.h"
#include "gen_cpp/Types_types.h"
@@ -83,4 +85,125 @@ Schema create_schema() {
return schema;
}
+TEST(MemTableFlushExecutorTest, TestDynamicThreadPoolUpdate) {
+ // Setup
+ set_up();
+
+ auto* flush_executor =
ExecEnv::GetInstance()->storage_engine().memtable_flush_executor();
+ ASSERT_NE(flush_executor, nullptr);
+
+ // Store original config values
+ int32_t original_flush_thread_num = config::flush_thread_num_per_store;
+ int32_t original_high_priority_flush_thread_num =
+ config::high_priority_flush_thread_num_per_store;
+ int32_t original_max_flush_thread_num =
config::max_flush_thread_num_per_cpu;
+
+ // Test 1: Get initial thread pool sizes
+ int initial_max_threads = flush_executor->flush_pool()->max_threads();
+ int initial_min_threads = flush_executor->flush_pool()->min_threads();
+ EXPECT_GT(initial_max_threads, 0);
+ EXPECT_GT(initial_min_threads, 0);
+
+ // Test 2: Update flush_thread_num_per_store and verify thread pool updates
+ config::flush_thread_num_per_store = 10;
+ flush_executor->update_memtable_flush_threads();
+
+ int new_min_threads = flush_executor->flush_pool()->min_threads();
+ EXPECT_EQ(new_min_threads, 10);
+
+ // Test 3: Update max_flush_thread_num_per_cpu and verify thread pool
updates
+ config::max_flush_thread_num_per_cpu = 2;
+ flush_executor->update_memtable_flush_threads();
+
+ int num_cpus = std::thread::hardware_concurrency();
+ if (num_cpus > 0) {
+ int expected_max = std::min(10 * 1, num_cpus * 2); // 1 disk, 10
threads per store
+ int actual_max = flush_executor->flush_pool()->max_threads();
+ EXPECT_EQ(actual_max, expected_max);
+ }
+
+ // Test 4: Update high_priority_flush_thread_num_per_store
+ config::high_priority_flush_thread_num_per_store = 8;
+ flush_executor->update_memtable_flush_threads();
+ // Note: We can't directly access _high_prio_flush_pool, but update should
not crash
+
+ // Test 5: Set very small values
+ config::flush_thread_num_per_store = 0; // Should be adjusted to 1 by
std::max
+ flush_executor->update_memtable_flush_threads();
+ EXPECT_GE(flush_executor->flush_pool()->min_threads(), 1);
+
+ // Test 6: Set large values
+ config::flush_thread_num_per_store = 100;
+ flush_executor->update_memtable_flush_threads();
+ EXPECT_GE(flush_executor->flush_pool()->min_threads(), 1);
+
+ // Restore original config values
+ config::flush_thread_num_per_store = original_flush_thread_num;
+ config::high_priority_flush_thread_num_per_store =
original_high_priority_flush_thread_num;
+ config::max_flush_thread_num_per_cpu = original_max_flush_thread_num;
+ flush_executor->update_memtable_flush_threads();
+
+ // Cleanup
+ tear_down();
+}
+
+TEST(MemTableFlushExecutorTest, TestConfigUpdateTrigger) {
+ // Setup
+ set_up();
+
+ auto* flush_executor =
ExecEnv::GetInstance()->storage_engine().memtable_flush_executor();
+ ASSERT_NE(flush_executor, nullptr);
+
+ // Store original config values
+ int32_t original_flush_thread_num = config::flush_thread_num_per_store;
+
+ // Get initial thread pool size
+ int initial_min_threads = flush_executor->flush_pool()->min_threads();
+
+ // Test: Simulate config update via set_config
+ config::flush_thread_num_per_store = 15;
+ config::update_config("flush_thread_num_per_store", "15");
+
+ // Verify thread pool was updated
+ int updated_min_threads = flush_executor->flush_pool()->min_threads();
+ EXPECT_EQ(updated_min_threads, 15);
+ EXPECT_NE(updated_min_threads, initial_min_threads);
+
+ // Restore original config value
+ config::flush_thread_num_per_store = original_flush_thread_num;
+ flush_executor->update_memtable_flush_threads();
+
+ // Cleanup
+ tear_down();
+}
+
+TEST(MemTableFlushExecutorTest, TestThreadPoolMinMaxRelationship) {
+ // Setup
+ set_up();
+
+ auto* flush_executor =
ExecEnv::GetInstance()->storage_engine().memtable_flush_executor();
+ ASSERT_NE(flush_executor, nullptr);
+
+ // Store original config values
+ int32_t original_flush_thread_num = config::flush_thread_num_per_store;
+ int32_t original_max_flush_thread_num =
config::max_flush_thread_num_per_cpu;
+
+ // Test: Ensure min_threads <= max_threads always
+ config::flush_thread_num_per_store = 20;
+ config::max_flush_thread_num_per_cpu = 1; // Very restrictive
+ flush_executor->update_memtable_flush_threads();
+
+ int min_threads = flush_executor->flush_pool()->min_threads();
+ int max_threads = flush_executor->flush_pool()->max_threads();
+ EXPECT_LE(min_threads, max_threads);
+
+ // Restore original config values
+ config::flush_thread_num_per_store = original_flush_thread_num;
+ config::max_flush_thread_num_per_cpu = original_max_flush_thread_num;
+ flush_executor->update_memtable_flush_threads();
+
+ // Cleanup
+ tear_down();
+}
+
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]