github-actions[bot] commented on code in PR #60617:
URL: https://github.com/apache/doris/pull/60617#discussion_r3009496599


##########
be/src/load/memtable/memtable_memory_limiter.h:
##########
@@ -54,6 +54,8 @@ class MemTableMemoryLimiter {
 
     int64_t mem_usage() const { return _mem_usage; }
 
+    bool soft_limit_reached() { return _soft_limit_reached(); }
+
 private:
     static inline int64_t _sys_avail_mem_less_than_warning_water_mark();

Review Comment:
   **Dead code:** This public `soft_limit_reached()` method is never called 
anywhere in the codebase, including the new adaptive controller code. Please 
remove it or use it in the adaptive adjustment logic (e.g., scale up threads 
when soft limit is reached).



##########
be/src/runtime/workload_group/workload_group.cpp:
##########
@@ -434,11 +437,9 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
     num_cpus = std::thread::hardware_concurrency();
 #endif
     num_disk = std::max(1, num_disk);
-    int min_flush_thread_num = std::max(1, config::flush_thread_num_per_store);
-    int max_flush_thread_num = num_cpus == 0

Review Comment:
   **Asymmetric registration/cancellation when config changes dynamically:** 
The `config::enable_adaptive_flush_threads` check here at creation time is 
independent from the matching check in `try_stop_schedulers()`. If the config 
is toggled between creation and destruction (it's `mutable`), the pool may be 
registered but never cancelled (leaking a timer event with a dangling pool 
pointer) or cancelled without being registered (benign). Consider always 
registering/cancelling unconditionally, relying on the `_fire_group()` 
early-return check for the dynamic on/off behavior.



##########
be/src/storage/storage_engine.cpp:
##########
@@ -140,6 +141,28 @@ int64_t 
BaseStorageEngine::memory_limitation_bytes_per_thread_for_schema_change(
                     
config::memory_limitation_per_thread_for_schema_change_bytes);
 }
 
+void BaseStorageEngine::_start_adaptive_thread_controller() {
+    if (!config::enable_adaptive_flush_threads) {
+        return;
+    }
+
+    auto* system_metrics = DorisMetrics::instance()->system_metrics();

Review Comment:
   **Both pools adjusted by single queue metric:** The `"flush"` group contains 
both `flush_pool` and `high_prio_pool`, but `make_flush_adjust_func` only 
monitors `flush_pool->get_queue_size()`. If the high-priority pool has a deep 
queue while the regular pool doesn't, the system won't scale up. Consider 
either:
   1. Monitoring both pools' queue sizes in the adjust function, or
   2. Registering them as separate groups with individual adjust functions.



##########
be/src/common/config.cpp:
##########
@@ -824,6 +824,12 @@ DEFINE_mInt32(high_priority_flush_thread_num_per_store, 
"6");
 //                         max_flush_thread_num_per_cpu * num_cpu)
 DEFINE_mInt32(max_flush_thread_num_per_cpu, "4");
 
+// minimum flush threads per cpu when adaptive flush is enabled (default 0.5)
+DEFINE_mDouble(min_flush_thread_num_per_cpu, "0.5");
+
+// Whether to enable adaptive flush thread adjustment
+DEFINE_mBool(enable_adaptive_flush_threads, "true");
+

Review Comment:
   **Default `true` is a behavior change for existing deployments.** When 
`enable_adaptive_flush_threads` is `true`, `calc_flush_thread_count()` ignores 
`flush_thread_num_per_store` and `num_disk`, which changes thread pool sizing 
for all users on upgrade. Consider defaulting to `false` for safe rollout, or 
at minimum call this out in the release notes.



##########
be/src/storage/adaptive_thread_pool_controller.h:
##########
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <functional>
+#include <map>
+#include <mutex>
+#include <string>
+#include <vector>
+
+#include "util/timer.h"
+
+namespace doris {
+
+class ThreadPool;
+class SystemMetrics;
+
+// AdaptiveThreadPoolController dynamically adjusts thread pool sizes based on
+// system load (IO utilisation, CPU load average, flush queue depth).
+//
+// Scheduling is fully delegated to a Timer (be/src/util/timer.h): each
+// registered pool group becomes a recurring Timer event. The controller itself
+// contains no threads or sleep loops — it only holds adjustment logic and
+// pool-group metadata.
+//
+// Usage:
+//   Timer timer;
+//   timer.start();
+//   AdaptiveThreadPoolController ctrl;
+//   ctrl.init(system_metrics, s3_pool, &timer);
+//   ctrl.add("flush", {pool1, pool2},
+//       AdaptiveThreadPoolController::make_flush_adjust_func(&ctrl, pool1),
+//       max_per_cpu, min_per_cpu);
+//   // ... later ...
+//   ctrl.cancel("flush");
+//   timer.stop();
+class AdaptiveThreadPoolController {
+public:
+    using AdjustFunc =
+            std::function<int(int current, int min_threads, int max_threads, 
std::string& reason)>;
+
+    static constexpr int kDefaultIntervalMs = 10000;
+
+    static constexpr int kQueueThreshold = 10;
+    static constexpr int kIOBusyThresholdPercent = 90;
+    static constexpr int kCPUBusyThresholdPercent = 90;
+    static constexpr int kS3QueueBusyThreshold = 100;
+
+    AdaptiveThreadPoolController() = default;
+    ~AdaptiveThreadPoolController() { stop(); }
+
+    // Initialize with system-level dependencies. `timer` must outlive this 
controller.
+    void init(SystemMetrics* system_metrics, ThreadPool* s3_file_upload_pool, 
Timer* timer);
+
+    // Cancel all registered pool groups. Must be called before the pools are 
destroyed.
+    void stop();
+
+    // Register a pool group and schedule a recurring adjustment event on the 
timer.
+    void add(std::string name, std::vector<ThreadPool*> pools, AdjustFunc 
adjust_func,
+             double max_threads_per_cpu, double min_threads_per_cpu,
+             Timer::Duration interval = 
std::chrono::milliseconds(kDefaultIntervalMs));
+
+    // Cancel the recurring event and remove the pool group.
+    // Must be called before the pools are destroyed to prevent UAF.
+    void cancel(const std::string& name);
+
+    // Fire all registered groups once, ignoring the schedule. For testing.
+    void adjust_once();
+
+    // Get current thread count for a named group. For testing/debugging.
+    int get_current_threads(const std::string& name) const;
+
+    // System-state helpers; safe to call from inside an AdjustFunc.
+    bool is_io_busy();
+    bool is_cpu_busy();
+
+    // Factory: standard flush-pool adjust function.
+    static AdjustFunc make_flush_adjust_func(AdaptiveThreadPoolController* 
controller,
+                                             ThreadPool* flush_pool);
+
+private:
+    struct PoolGroup {
+        std::string name;
+        std::vector<ThreadPool*> pools;
+        AdjustFunc adjust_func;
+        double max_threads_per_cpu = 4.0;
+        double min_threads_per_cpu = 0.5;
+        int current_threads = 0;
+
+        int get_max_threads() const;
+        int get_min_threads() const;
+    };
+
+    // Run one group's adjustment. Called from the Timer thread (no lock held 
on entry).

Review Comment:
   **Thread-safety invariant not documented:** The fields `_last_io_busy`, 
`_last_disk_io_time`, and `_last_check_time_sec` are accessed without `_mutex` 
protection. This is safe only because the Timer is single-threaded and 
`is_io_busy()` / `is_cpu_busy()` are only called from Timer callbacks. Please 
add a comment documenting this invariant, e.g.:
   ```cpp
   // These fields are only accessed from the Timer callback thread 
(single-threaded).
   // They must NOT be accessed from other threads without additional 
synchronization.
   ```



##########
be/src/storage/adaptive_thread_pool_controller.cpp:
##########
@@ -0,0 +1,257 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "storage/adaptive_thread_pool_controller.h"
+
+#include <algorithm>
+#include <thread>
+
+#include "cloud/config.h"
+#include "common/config.h"
+#include "common/logging.h"
+#include "common/metrics/system_metrics.h"
+#include "common/status.h"
+#include "util/threadpool.h"
+#include "util/time.h"
+
+namespace doris {
+
+int AdaptiveThreadPoolController::PoolGroup::get_max_threads() const {
+    int num_cpus = std::thread::hardware_concurrency();
+    if (num_cpus <= 0) num_cpus = 1;
+    return static_cast<int>(num_cpus * max_threads_per_cpu);
+}
+
+int AdaptiveThreadPoolController::PoolGroup::get_min_threads() const {
+    int num_cpus = std::thread::hardware_concurrency();
+    if (num_cpus <= 0) num_cpus = 1;
+    return std::max(1, static_cast<int>(num_cpus * min_threads_per_cpu));
+}
+
+void AdaptiveThreadPoolController::init(SystemMetrics* system_metrics,
+                                        ThreadPool* s3_file_upload_pool, 
Timer* timer) {
+    _system_metrics = system_metrics;
+    _s3_file_upload_pool = s3_file_upload_pool;
+    _timer = timer;
+}
+
+void AdaptiveThreadPoolController::stop() {
+    std::vector<std::string> names;
+    {
+        std::lock_guard<std::mutex> lk(_mutex);
+        for (const auto& [name, _] : _pool_groups) {
+            names.push_back(name);
+        }
+    }
+    for (const auto& name : names) {
+        cancel(name);
+    }
+}
+
+void AdaptiveThreadPoolController::add(std::string name, 
std::vector<ThreadPool*> pools,
+                                       AdjustFunc adjust_func, double 
max_threads_per_cpu,
+                                       double min_threads_per_cpu, 
Timer::Duration interval) {
+    PoolGroup group;
+    group.name = name;
+    group.pools = std::move(pools);
+    group.adjust_func = std::move(adjust_func);
+    group.max_threads_per_cpu = max_threads_per_cpu;
+    group.min_threads_per_cpu = min_threads_per_cpu;
+    group.current_threads = group.get_max_threads();
+
+    // Capture for the LOG below before group is moved into the map.
+    int log_max = group.get_max_threads();
+    int log_min = group.get_min_threads();
+
+    // Schedule the recurring adjustment on the shared timer.
+    Timer::EventId event_id =
+            _timer->schedule_recurring(interval, [this, name] { 
_fire_group(name); });
+
+    {
+        std::lock_guard<std::mutex> lk(_mutex);
+        _pool_groups[name] = std::move(group);

Review Comment:
   **Duplicate name silently leaks old Timer event:** If `add()` is called with 
a name that already exists in `_pool_groups`, the old `Timer::EventId` is 
overwritten without being cancelled first, leaking the recurring timer event. 
Consider adding:
   ```cpp
   // Cancel any existing registration with the same name.
   cancel(name);
   ```
   at the top of `add()`, before scheduling the new event.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to