This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new e01d7c87679 branch-4.1: [fix](memtable) use CPU metrics delta for CPU
pressure detection in adaptive flush controller #62744 (#62943)
e01d7c87679 is described below
commit e01d7c87679b4145fd2fc8e52ca8e782e75e374e
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed May 13 17:06:18 2026 +0800
branch-4.1: [fix](memtable) use CPU metrics delta for CPU pressure
detection in adaptive flush controller #62744 (#62943)
Cherry-picked from #62744
Co-authored-by: hui lai <[email protected]>
---
be/src/common/metrics/system_metrics.cpp | 25 ++++++++
be/src/common/metrics/system_metrics.h | 11 ++++
be/src/storage/adaptive_thread_pool_controller.cpp | 35 +++++++++--
be/src/storage/adaptive_thread_pool_controller.h | 9 ++-
.../adaptive_thread_pool_controller_test.cpp | 69 ++++++++++++++++++++++
be/test/util/test_data/stat_cpu_busy_after | 1 +
be/test/util/test_data/stat_cpu_busy_before | 1 +
be/test/util/test_data/stat_cpu_iowait_after | 1 +
be/test/util/test_data/stat_cpu_recovery_after | 1 +
be/test/util/test_data/stat_cpu_regressed_after | 1 +
10 files changed, 148 insertions(+), 6 deletions(-)
diff --git a/be/src/common/metrics/system_metrics.cpp
b/be/src/common/metrics/system_metrics.cpp
index f111c04b491..da88b7b8094 100644
--- a/be/src/common/metrics/system_metrics.cpp
+++ b/be/src/common/metrics/system_metrics.cpp
@@ -468,6 +468,17 @@ void SystemMetrics::_update_cpu_metrics() {
continue;
}
+ if (cpu_name == "cpu") {
+ AggregateCpuTime aggregate_cpu_time;
+ aggregate_cpu_time.total_time = values[0] + values[1] + values[2]
+ values[3] +
+ values[4] + values[5] + values[6]
+ values[7];
+ aggregate_cpu_time.idle_time = values[3] + values[4];
+ aggregate_cpu_time.initialized = aggregate_cpu_time.total_time > 0;
+ // Publish a consistent aggregate snapshot derived from one
/proc/stat row.
+ std::lock_guard<std::mutex> lk(_aggregate_cpu_time_mutex);
+ _aggregate_cpu_time = aggregate_cpu_time;
+ }
+
for (int i = 0; i < CpuMetrics::cpu_num_metrics; ++i) {
it->second->metrics[i]->set_value(values[i]);
}
@@ -909,6 +920,20 @@ double SystemMetrics::get_load_average_1_min() {
}
}
+bool SystemMetrics::get_aggregate_cpu_time(int64_t* total_time, int64_t*
idle_time) const {
+ DCHECK(total_time != nullptr);
+ DCHECK(idle_time != nullptr);
+
+ std::lock_guard<std::mutex> lk(_aggregate_cpu_time_mutex);
+ if (!_aggregate_cpu_time.initialized) {
+ return false;
+ }
+
+ *total_time = _aggregate_cpu_time.total_time;
+ *idle_time = _aggregate_cpu_time.idle_time;
+ return true;
+}
+
void SystemMetrics::get_network_traffic(std::map<std::string, int64_t>*
send_map,
std::map<std::string, int64_t>*
rcv_map) {
send_map->clear();
diff --git a/be/src/common/metrics/system_metrics.h
b/be/src/common/metrics/system_metrics.h
index 52ca7299f70..a5b4ccceb7e 100644
--- a/be/src/common/metrics/system_metrics.h
+++ b/be/src/common/metrics/system_metrics.h
@@ -22,6 +22,7 @@
#include <map>
#include <memory>
+#include <mutex>
#include <set>
#include <string>
#include <vector>
@@ -59,6 +60,7 @@ public:
int64_t interval_sec, int64_t* send_rate,
int64_t* rcv_rate);
double get_load_average_1_min();
+ bool get_aggregate_cpu_time(int64_t* total_time, int64_t* idle_time) const;
void update_max_disk_io_util_percent(const std::map<std::string, int64_t>&
lst_value,
int64_t interval_sec);
@@ -99,6 +101,12 @@ private:
void get_cpu_name();
private:
+ struct AggregateCpuTime {
+ int64_t total_time = 0;
+ int64_t idle_time = 0;
+ bool initialized = false;
+ };
+
static const char* _s_hook_name;
std::map<std::string, CpuMetrics*> _cpu_metrics;
@@ -118,6 +126,9 @@ private:
MetricRegistry* _registry = nullptr;
std::shared_ptr<MetricEntity> _server_entity;
+ mutable std::mutex _aggregate_cpu_time_mutex;
+ AggregateCpuTime _aggregate_cpu_time;
+
IntGauge* max_disk_io_util_percent = nullptr;
IntGauge* max_network_send_bytes_rate = nullptr;
IntGauge* max_network_receive_bytes_rate = nullptr;
diff --git a/be/src/storage/adaptive_thread_pool_controller.cpp
b/be/src/storage/adaptive_thread_pool_controller.cpp
index 8b8de54654d..bcb7b089389 100644
--- a/be/src/storage/adaptive_thread_pool_controller.cpp
+++ b/be/src/storage/adaptive_thread_pool_controller.cpp
@@ -252,6 +252,8 @@ int AdaptiveThreadPoolController::get_current_threads(const
std::string& name) c
}
bool AdaptiveThreadPoolController::is_io_busy() {
+ std::lock_guard<std::mutex> lk(_metrics_state_mutex);
+
if (config::is_cloud_mode()) {
if (_s3_file_upload_pool == nullptr) return false;
int queue_size = _s3_file_upload_pool->get_queue_size();
@@ -275,14 +277,37 @@ bool AdaptiveThreadPoolController::is_io_busy() {
}
bool AdaptiveThreadPoolController::is_cpu_busy() {
+ std::lock_guard<std::mutex> lk(_metrics_state_mutex);
+
if (_system_metrics == nullptr) return false;
- double load_avg = _system_metrics->get_load_average_1_min();
- int num_cpus = std::thread::hardware_concurrency();
- if (num_cpus <= 0) return false;
+ int64_t total_time = 0;
+ int64_t idle_time = 0;
+ if (!_system_metrics->get_aggregate_cpu_time(&total_time, &idle_time)) {
+ return _last_cpu_busy;
+ }
+
+ if (_last_cpu_total_time < 0) {
+ _last_cpu_total_time = total_time;
+ _last_cpu_idle_time = idle_time;
+ _last_cpu_busy = false;
+ return false;
+ }
+
+ int64_t total_time_delta = total_time - _last_cpu_total_time;
+ int64_t idle_time_delta = idle_time - _last_cpu_idle_time;
+ if (total_time_delta <= 0 || idle_time_delta < 0 || idle_time_delta >
total_time_delta) {
+ // Keep the previous baseline so a transient invalid sample does not
+ // poison the next interval's delta calculation.
+ return _last_cpu_busy;
+ }
- double cpu_usage_percent = (load_avg / num_cpus) * 100.0;
- return cpu_usage_percent > kCPUBusyThresholdPercent;
+ _last_cpu_total_time = total_time;
+ _last_cpu_idle_time = idle_time;
+ double cpu_busy_percent =
+ static_cast<double>(total_time_delta - idle_time_delta) * 100.0 /
total_time_delta;
+ _last_cpu_busy = cpu_busy_percent > kCPUBusyThresholdPercent;
+ return _last_cpu_busy;
}
AdaptiveThreadPoolController::AdjustFunc
AdaptiveThreadPoolController::make_flush_adjust_func(
diff --git a/be/src/storage/adaptive_thread_pool_controller.h
b/be/src/storage/adaptive_thread_pool_controller.h
index 4a6f0096f62..0598531268c 100644
--- a/be/src/storage/adaptive_thread_pool_controller.h
+++ b/be/src/storage/adaptive_thread_pool_controller.h
@@ -56,7 +56,7 @@ struct TimerArg {
};
// AdaptiveThreadPoolController dynamically adjusts thread pool sizes based on
-// system load (IO utilisation, CPU load average, flush queue depth).
+// system load (IO utilisation, CPU utilisation, flush queue depth).
//
// Each registered pool group runs as a one-shot bthread_timer_add chain: the
// callback fires, adjusts the pool, then re-registers the next one-shot timer.
@@ -142,6 +142,7 @@ private:
ThreadPool* _s3_file_upload_pool = nullptr;
mutable std::mutex _mutex;
+ mutable std::mutex _metrics_state_mutex;
std::map<std::string, PoolGroup> _pool_groups;
// Last successfully computed IO-busy result. Returned as-is when the
@@ -151,6 +152,12 @@ private:
// For disk IO util calculation (used by is_io_busy).
std::map<std::string, int64_t> _last_disk_io_time;
int64_t _last_check_time_sec = 0;
+
+ // For CPU util calculation (used by is_cpu_busy). The counters come from
+ // SystemMetrics' existing cpu_* metrics and are compared as deltas.
+ bool _last_cpu_busy = false;
+ int64_t _last_cpu_total_time = -1;
+ int64_t _last_cpu_idle_time = -1;
};
} // namespace doris
diff --git a/be/test/storage/adaptive_thread_pool_controller_test.cpp
b/be/test/storage/adaptive_thread_pool_controller_test.cpp
index b2d6464fcef..06d79629330 100644
--- a/be/test/storage/adaptive_thread_pool_controller_test.cpp
+++ b/be/test/storage/adaptive_thread_pool_controller_test.cpp
@@ -22,10 +22,23 @@
#include <thread>
#include "common/config.h"
+#include "common/metrics/metrics.h"
+#include "common/metrics/system_metrics.h"
+#include "testutil/test_util.h"
#include "util/threadpool.h"
namespace doris {
+extern const char* k_ut_stat_path;
+
+namespace {
+
+std::string get_stat_test_data_path(const std::string& file_name) {
+ return GetCurrentRunningDir() + "/util/test_data/" + file_name;
+}
+
+} // namespace
+
class AdaptiveThreadPoolControllerTest : public testing::Test {
protected:
void SetUp() override {
@@ -242,6 +255,62 @@ TEST_F(AdaptiveThreadPoolControllerTest,
TestIoBusyCpuBusyWithNullMetrics) {
EXPECT_FALSE(controller.is_cpu_busy());
}
+TEST_F(AdaptiveThreadPoolControllerTest, TestCpuBusyUsesCpuMetricsDelta) {
+ MetricRegistry registry("test");
+ const std::string before_path =
get_stat_test_data_path("stat_cpu_busy_before");
+ const std::string after_path =
get_stat_test_data_path("stat_cpu_busy_after");
+ k_ut_stat_path = before_path.c_str();
+ SystemMetrics metrics(®istry, {}, {});
+ metrics.update();
+
+ AdaptiveThreadPoolController controller;
+ controller.init(&metrics, nullptr);
+
+ EXPECT_FALSE(controller.is_cpu_busy());
+ k_ut_stat_path = after_path.c_str();
+ metrics.update();
+ EXPECT_TRUE(controller.is_cpu_busy());
+}
+
+TEST_F(AdaptiveThreadPoolControllerTest, TestCpuBusyTreatsIoWaitAsIdle) {
+ MetricRegistry registry("test");
+ const std::string before_path =
get_stat_test_data_path("stat_cpu_busy_before");
+ const std::string after_path =
get_stat_test_data_path("stat_cpu_iowait_after");
+ k_ut_stat_path = before_path.c_str();
+ SystemMetrics metrics(®istry, {}, {});
+ metrics.update();
+
+ AdaptiveThreadPoolController controller;
+ controller.init(&metrics, nullptr);
+
+ EXPECT_FALSE(controller.is_cpu_busy());
+ k_ut_stat_path = after_path.c_str();
+ metrics.update();
+ EXPECT_FALSE(controller.is_cpu_busy());
+}
+
+TEST_F(AdaptiveThreadPoolControllerTest,
TestCpuBusyInvalidSampleDoesNotAdvanceBaseline) {
+ MetricRegistry registry("test");
+ const std::string before_path =
get_stat_test_data_path("stat_cpu_busy_before");
+ const std::string invalid_path =
get_stat_test_data_path("stat_cpu_regressed_after");
+ const std::string recovery_path =
get_stat_test_data_path("stat_cpu_recovery_after");
+ k_ut_stat_path = before_path.c_str();
+ SystemMetrics metrics(®istry, {}, {});
+ metrics.update();
+
+ AdaptiveThreadPoolController controller;
+ controller.init(&metrics, nullptr);
+
+ EXPECT_FALSE(controller.is_cpu_busy());
+ k_ut_stat_path = invalid_path.c_str();
+ metrics.update();
+ EXPECT_FALSE(controller.is_cpu_busy());
+
+ k_ut_stat_path = recovery_path.c_str();
+ metrics.update();
+ EXPECT_TRUE(controller.is_cpu_busy());
+}
+
// Test adjust function that uses controller's is_io_busy/is_cpu_busy
TEST_F(AdaptiveThreadPoolControllerTest, TestAdjustFuncWithControllerMethods) {
config::enable_adaptive_flush_threads = true;
diff --git a/be/test/util/test_data/stat_cpu_busy_after
b/be/test/util/test_data/stat_cpu_busy_after
new file mode 100644
index 00000000000..2b5027df63d
--- /dev/null
+++ b/be/test/util/test_data/stat_cpu_busy_after
@@ -0,0 +1 @@
+cpu 250 0 200 820 0 0 0 0 0 0
diff --git a/be/test/util/test_data/stat_cpu_busy_before
b/be/test/util/test_data/stat_cpu_busy_before
new file mode 100644
index 00000000000..bcf77b76c25
--- /dev/null
+++ b/be/test/util/test_data/stat_cpu_busy_before
@@ -0,0 +1 @@
+cpu 100 0 100 800 0 0 0 0 0 0
diff --git a/be/test/util/test_data/stat_cpu_iowait_after
b/be/test/util/test_data/stat_cpu_iowait_after
new file mode 100644
index 00000000000..c58b33ea7ca
--- /dev/null
+++ b/be/test/util/test_data/stat_cpu_iowait_after
@@ -0,0 +1 @@
+cpu 120 0 110 810 260 0 0 0 0 0
diff --git a/be/test/util/test_data/stat_cpu_recovery_after
b/be/test/util/test_data/stat_cpu_recovery_after
new file mode 100644
index 00000000000..3ecc01524b6
--- /dev/null
+++ b/be/test/util/test_data/stat_cpu_recovery_after
@@ -0,0 +1 @@
+cpu 200 0 200 819 0 0 0 0 0 0
diff --git a/be/test/util/test_data/stat_cpu_regressed_after
b/be/test/util/test_data/stat_cpu_regressed_after
new file mode 100644
index 00000000000..111f0f30706
--- /dev/null
+++ b/be/test/util/test_data/stat_cpu_regressed_after
@@ -0,0 +1 @@
+cpu 90 0 90 770 0 0 0 0 0 0
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]