This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b633fbe899a [enhance](memtable) support dynamic modification of flush 
thread pool size (#60423)
b633fbe899a is described below

commit b633fbe899abbd614ff4da19b59927c7d598d194
Author: hui lai <[email protected]>
AuthorDate: Thu Feb 5 13:02:11 2026 +0800

    [enhance](memtable) support dynamic modification of flush thread pool size 
(#60423)
    
    ### What problem does this PR solve?
    
    Support dynamic modification of flush thread pool size.
    
    Usage Example:
    ```
    # Update flush thread pool size without restarting BE
    curl 
"http://be_host:webserver_port/api/update_config?flush_thread_num_per_store=10";
    curl 
"http://be_host:webserver_port/api/update_config?max_flush_thread_num_per_cpu=8";
    ```
---
 be/src/common/config.cpp                           |  26 ++++-
 be/src/common/config.h                             |   6 +-
 be/src/olap/memtable_flush_executor.cpp            |  29 ++++-
 be/src/olap/memtable_flush_executor.h              |   3 +
 be/src/runtime/workload_group/workload_group.cpp   |  29 +++++
 be/src/runtime/workload_group/workload_group.h     |   3 +
 .../workload_group/workload_group_manager.cpp      |   7 ++
 .../workload_group/workload_group_manager.h        |   2 +
 be/test/olap/memtable_flush_executor_test.cpp      | 123 +++++++++++++++++++++
 9 files changed, 217 insertions(+), 11 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index ac90587173e..1648c502f09 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -45,6 +45,10 @@
 #include "common/status.h"
 #include "io/fs/file_writer.h"
 #include "io/fs/local_file_system.h"
+#include "olap/memtable_flush_executor.h"
+#include "olap/storage_engine.h"
+#include "runtime/exec_env.h"
+#include "runtime/workload_group/workload_group_manager.h"
 #include "util/cpu_info.h"
 #include "util/string_util.h"
 
@@ -813,12 +817,12 @@ DEFINE_mInt32(storage_flood_stage_usage_percent, "90"); 
// 90%
 // The min bytes that should be left of a data dir
 DEFINE_mInt64(storage_flood_stage_left_capacity_bytes, "1073741824"); // 1GB
 // number of thread for flushing memtable per store
-DEFINE_Int32(flush_thread_num_per_store, "6");
+DEFINE_mInt32(flush_thread_num_per_store, "6");
 // number of thread for flushing memtable per store, for high priority load 
task
-DEFINE_Int32(high_priority_flush_thread_num_per_store, "6");
+DEFINE_mInt32(high_priority_flush_thread_num_per_store, "6");
 // number of threads = min(flush_thread_num_per_store * num_store,
 //                         max_flush_thread_num_per_cpu * num_cpu)
-DEFINE_Int32(max_flush_thread_num_per_cpu, "4");
+DEFINE_mInt32(max_flush_thread_num_per_cpu, "4");
 
 // config for tablet meta checkpoint
 DEFINE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num, "10");
@@ -2100,6 +2104,22 @@ void update_config(const std::string& field, const 
std::string& value) {
     if ("sys_log_level" == field) {
         // update log level
         update_logging(field, value);
+    } else if ("flush_thread_num_per_store" == field ||
+               "high_priority_flush_thread_num_per_store" == field ||
+               "max_flush_thread_num_per_cpu" == field) {
+        // update memtable flush thread pool size
+        auto* exec_env = ExecEnv::GetInstance();
+        if (exec_env != nullptr) {
+            auto* flush_executor = 
exec_env->storage_engine().memtable_flush_executor();
+            if (flush_executor != nullptr) {
+                flush_executor->update_memtable_flush_threads();
+            }
+            // update workload groups' memtable flush thread pools
+            auto* wg_mgr = exec_env->workload_group_mgr();
+            if (wg_mgr != nullptr) {
+                wg_mgr->update_memtable_flush_threads();
+            }
+        }
     }
 }
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index ab7b103ac2a..4e1ce9c96fc 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -851,12 +851,12 @@ DECLARE_mInt32(storage_flood_stage_usage_percent); // 90%
 // The min bytes that should be left of a data dir
 DECLARE_mInt64(storage_flood_stage_left_capacity_bytes); // 1GB
 // number of thread for flushing memtable per store
-DECLARE_Int32(flush_thread_num_per_store);
+DECLARE_mInt32(flush_thread_num_per_store);
 // number of thread for flushing memtable per store, for high priority load 
task
-DECLARE_Int32(high_priority_flush_thread_num_per_store);
+DECLARE_mInt32(high_priority_flush_thread_num_per_store);
 // number of threads = min(flush_thread_num_per_store * num_store,
 //                         max_flush_thread_num_per_cpu * num_cpu)
-DECLARE_Int32(max_flush_thread_num_per_cpu);
+DECLARE_mInt32(max_flush_thread_num_per_cpu);
 
 // config for tablet meta checkpoint
 DECLARE_mInt32(tablet_meta_checkpoint_min_new_rowsets_num);
diff --git a/be/src/olap/memtable_flush_executor.cpp 
b/be/src/olap/memtable_flush_executor.cpp
index f2c2b764644..cdc67ab32b8 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -294,11 +294,11 @@ void 
FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable_ptr, int32_t
 }
 
 void MemTableFlushExecutor::init(int num_disk) {
-    num_disk = std::max(1, num_disk);
+    _num_disk = std::max(1, num_disk);
     int num_cpus = std::thread::hardware_concurrency();
     int min_threads = std::max(1, config::flush_thread_num_per_store);
-    int max_threads = num_cpus == 0 ? num_disk * min_threads
-                                    : std::min(num_disk * min_threads,
+    int max_threads = num_cpus == 0 ? _num_disk * min_threads
+                                    : std::min(_num_disk * min_threads,
                                                num_cpus * 
config::max_flush_thread_num_per_cpu);
     static_cast<void>(ThreadPoolBuilder("MemTableFlushThreadPool")
                               .set_min_threads(min_threads)
@@ -306,8 +306,8 @@ void MemTableFlushExecutor::init(int num_disk) {
                               .build(&_flush_pool));
 
     min_threads = std::max(1, 
config::high_priority_flush_thread_num_per_store);
-    max_threads = num_cpus == 0 ? num_disk * min_threads
-                                : std::min(num_disk * min_threads,
+    max_threads = num_cpus == 0 ? _num_disk * min_threads
+                                : std::min(_num_disk * min_threads,
                                            num_cpus * 
config::max_flush_thread_num_per_cpu);
     static_cast<void>(ThreadPoolBuilder("MemTableHighPriorityFlushThreadPool")
                               .set_min_threads(min_threads)
@@ -315,6 +315,25 @@ void MemTableFlushExecutor::init(int num_disk) {
                               .build(&_high_prio_flush_pool));
 }
 
+void MemTableFlushExecutor::update_memtable_flush_threads() {
+    int num_cpus = std::thread::hardware_concurrency();
+    int min_threads = std::max(1, config::flush_thread_num_per_store);
+    int max_threads = num_cpus == 0 ? _num_disk * min_threads
+                                    : std::min(_num_disk * min_threads,
+                                               num_cpus * 
config::max_flush_thread_num_per_cpu);
+    // Update max_threads first to avoid constraint violation when increasing 
min_threads
+    static_cast<void>(_flush_pool->set_max_threads(max_threads));
+    static_cast<void>(_flush_pool->set_min_threads(min_threads));
+
+    min_threads = std::max(1, 
config::high_priority_flush_thread_num_per_store);
+    max_threads = num_cpus == 0 ? _num_disk * min_threads
+                                : std::min(_num_disk * min_threads,
+                                           num_cpus * 
config::max_flush_thread_num_per_cpu);
+    // Update max_threads first to avoid constraint violation when increasing 
min_threads
+    static_cast<void>(_high_prio_flush_pool->set_max_threads(max_threads));
+    static_cast<void>(_high_prio_flush_pool->set_min_threads(min_threads));
+}
+
 // NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are 
flushed in order.
 Status MemTableFlushExecutor::create_flush_token(std::shared_ptr<FlushToken>& 
flush_token,
                                                  std::shared_ptr<RowsetWriter> 
rowset_writer,
diff --git a/be/src/olap/memtable_flush_executor.h 
b/be/src/olap/memtable_flush_executor.h
index ecc062f1948..869fb801ccb 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -163,10 +163,13 @@ public:
 
     ThreadPool* flush_pool() { return _flush_pool.get(); }
 
+    void update_memtable_flush_threads();
+
 private:
     std::unique_ptr<ThreadPool> _flush_pool;
     std::unique_ptr<ThreadPool> _high_prio_flush_pool;
     std::atomic<int> _flushing_task_count = 0;
+    int _num_disk = 0;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index 9b507bbd537..bdb48b25874 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -26,8 +26,11 @@
 #include <ostream>
 #include <utility>
 
+#include "cloud/config.h"
+#include "common/config.h"
 #include "common/logging.h"
 #include "exec/schema_scanner/schema_scanner_helper.h"
+#include "io/cache/block_file_cache_factory.h"
 #include "io/fs/local_file_reader.h"
 #include "olap/storage_engine.h"
 #include "pipeline/task_queue.h"
@@ -748,6 +751,32 @@ void WorkloadGroup::try_stop_schedulers() {
     }
 }
 
+void WorkloadGroup::update_memtable_flush_threads() {
+    if (_memtable_flush_pool == nullptr) {
+        return;
+    }
+
+    int num_disk = 1;
+    int num_cpus = 0;
+#ifndef BE_TEST
+    if (config::is_cloud_mode()) {
+        num_disk = 
cast_set<int>(io::FileCacheFactory::instance()->get_cache_instance_size());
+    } else {
+        num_disk = ExecEnv::GetInstance()->storage_engine().get_disk_num();
+    }
+    num_cpus = std::thread::hardware_concurrency();
+#endif
+    num_disk = std::max(1, num_disk);
+    int min_threads = std::max(1, config::flush_thread_num_per_store);
+    int max_threads = num_cpus == 0 ? num_disk * min_threads
+                                    : std::min(num_disk * min_threads,
+                                               num_cpus * 
config::max_flush_thread_num_per_cpu);
+
+    // Update max_threads first to avoid constraint violation when increasing 
min_threads
+    static_cast<void>(_memtable_flush_pool->set_max_threads(max_threads));
+    static_cast<void>(_memtable_flush_pool->set_min_threads(min_threads));
+}
+
 #include "common/compile_check_end.h"
 
 } // namespace doris
diff --git a/be/src/runtime/workload_group/workload_group.h 
b/be/src/runtime/workload_group/workload_group.h
index 6a563cc9a68..d398c7fac0a 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -193,6 +193,9 @@ public:
         // to avoid lock competition with the workload thread pool's update
         return _memtable_flush_pool.get();
     }
+
+    void update_memtable_flush_threads();
+
     void create_cgroup_cpu_ctl();
 
     std::weak_ptr<CgroupCpuCtl> get_cgroup_cpu_ctl_wptr();
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index ef8d3534eb6..d226120a1ad 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -278,6 +278,13 @@ void WorkloadGroupMgr::refresh_workload_group_metrics() {
     }
 }
 
+void WorkloadGroupMgr::update_memtable_flush_threads() {
+    std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+    for (const auto& [id, wg] : _workload_groups) {
+        wg->update_memtable_flush_threads();
+    }
+}
+
 void WorkloadGroupMgr::add_paused_query(const 
std::shared_ptr<ResourceContext>& resource_ctx,
                                         int64_t reserve_size, const Status& 
status) {
     DCHECK(resource_ctx != nullptr);
diff --git a/be/src/runtime/workload_group/workload_group_manager.h 
b/be/src/runtime/workload_group/workload_group_manager.h
index 20f8a926148..016b6f73a56 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -89,6 +89,8 @@ public:
 
     void refresh_workload_group_metrics();
 
+    void update_memtable_flush_threads();
+
     MOCK_FUNCTION void add_paused_query(const 
std::shared_ptr<ResourceContext>& resource_ctx,
                                         int64_t reserve_size, const Status& 
status);
 
diff --git a/be/test/olap/memtable_flush_executor_test.cpp 
b/be/test/olap/memtable_flush_executor_test.cpp
index 687511a47d1..a3ab561c862 100644
--- a/be/test/olap/memtable_flush_executor_test.cpp
+++ b/be/test/olap/memtable_flush_executor_test.cpp
@@ -21,7 +21,9 @@
 #include <sys/file.h>
 
 #include <string>
+#include <thread>
 
+#include "common/config.h"
 #include "gen_cpp/Descriptors_types.h"
 #include "gen_cpp/PaloInternalService_types.h"
 #include "gen_cpp/Types_types.h"
@@ -83,4 +85,125 @@ Schema create_schema() {
     return schema;
 }
 
+TEST(MemTableFlushExecutorTest, TestDynamicThreadPoolUpdate) {
+    // Setup
+    set_up();
+
+    auto* flush_executor = 
ExecEnv::GetInstance()->storage_engine().memtable_flush_executor();
+    ASSERT_NE(flush_executor, nullptr);
+
+    // Store original config values
+    int32_t original_flush_thread_num = config::flush_thread_num_per_store;
+    int32_t original_high_priority_flush_thread_num =
+            config::high_priority_flush_thread_num_per_store;
+    int32_t original_max_flush_thread_num = 
config::max_flush_thread_num_per_cpu;
+
+    // Test 1: Get initial thread pool sizes
+    int initial_max_threads = flush_executor->flush_pool()->max_threads();
+    int initial_min_threads = flush_executor->flush_pool()->min_threads();
+    EXPECT_GT(initial_max_threads, 0);
+    EXPECT_GT(initial_min_threads, 0);
+
+    // Test 2: Update flush_thread_num_per_store and verify thread pool updates
+    config::flush_thread_num_per_store = 10;
+    flush_executor->update_memtable_flush_threads();
+
+    int new_min_threads = flush_executor->flush_pool()->min_threads();
+    EXPECT_EQ(new_min_threads, 10);
+
+    // Test 3: Update max_flush_thread_num_per_cpu and verify thread pool 
updates
+    config::max_flush_thread_num_per_cpu = 2;
+    flush_executor->update_memtable_flush_threads();
+
+    int num_cpus = std::thread::hardware_concurrency();
+    if (num_cpus > 0) {
+        int expected_max = std::min(10 * 1, num_cpus * 2); // 1 disk, 10 
threads per store
+        int actual_max = flush_executor->flush_pool()->max_threads();
+        EXPECT_EQ(actual_max, expected_max);
+    }
+
+    // Test 4: Update high_priority_flush_thread_num_per_store
+    config::high_priority_flush_thread_num_per_store = 8;
+    flush_executor->update_memtable_flush_threads();
+    // Note: We can't directly access _high_prio_flush_pool, but update should 
not crash
+
+    // Test 5: Set very small values
+    config::flush_thread_num_per_store = 0; // Should be adjusted to 1 by 
std::max
+    flush_executor->update_memtable_flush_threads();
+    EXPECT_GE(flush_executor->flush_pool()->min_threads(), 1);
+
+    // Test 6: Set large values
+    config::flush_thread_num_per_store = 100;
+    flush_executor->update_memtable_flush_threads();
+    EXPECT_GE(flush_executor->flush_pool()->min_threads(), 1);
+
+    // Restore original config values
+    config::flush_thread_num_per_store = original_flush_thread_num;
+    config::high_priority_flush_thread_num_per_store = 
original_high_priority_flush_thread_num;
+    config::max_flush_thread_num_per_cpu = original_max_flush_thread_num;
+    flush_executor->update_memtable_flush_threads();
+
+    // Cleanup
+    tear_down();
+}
+
+TEST(MemTableFlushExecutorTest, TestConfigUpdateTrigger) {
+    // Setup
+    set_up();
+
+    auto* flush_executor = 
ExecEnv::GetInstance()->storage_engine().memtable_flush_executor();
+    ASSERT_NE(flush_executor, nullptr);
+
+    // Store original config values
+    int32_t original_flush_thread_num = config::flush_thread_num_per_store;
+
+    // Get initial thread pool size
+    int initial_min_threads = flush_executor->flush_pool()->min_threads();
+
+    // Test: Simulate config update via set_config
+    config::flush_thread_num_per_store = 15;
+    config::update_config("flush_thread_num_per_store", "15");
+
+    // Verify thread pool was updated
+    int updated_min_threads = flush_executor->flush_pool()->min_threads();
+    EXPECT_EQ(updated_min_threads, 15);
+    EXPECT_NE(updated_min_threads, initial_min_threads);
+
+    // Restore original config value
+    config::flush_thread_num_per_store = original_flush_thread_num;
+    flush_executor->update_memtable_flush_threads();
+
+    // Cleanup
+    tear_down();
+}
+
+TEST(MemTableFlushExecutorTest, TestThreadPoolMinMaxRelationship) {
+    // Setup
+    set_up();
+
+    auto* flush_executor = 
ExecEnv::GetInstance()->storage_engine().memtable_flush_executor();
+    ASSERT_NE(flush_executor, nullptr);
+
+    // Store original config values
+    int32_t original_flush_thread_num = config::flush_thread_num_per_store;
+    int32_t original_max_flush_thread_num = 
config::max_flush_thread_num_per_cpu;
+
+    // Test: Ensure min_threads <= max_threads always
+    config::flush_thread_num_per_store = 20;
+    config::max_flush_thread_num_per_cpu = 1; // Very restrictive
+    flush_executor->update_memtable_flush_threads();
+
+    int min_threads = flush_executor->flush_pool()->min_threads();
+    int max_threads = flush_executor->flush_pool()->max_threads();
+    EXPECT_LE(min_threads, max_threads);
+
+    // Restore original config values
+    config::flush_thread_num_per_store = original_flush_thread_num;
+    config::max_flush_thread_num_per_cpu = original_max_flush_thread_num;
+    flush_executor->update_memtable_flush_threads();
+
+    // Cleanup
+    tear_down();
+}
+
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to