github-actions[bot] commented on code in PR #60344:
URL: https://github.com/apache/doris/pull/60344#discussion_r2925187722


##########
be/src/cloud/cloud_meta_mgr.cpp:
##########
@@ -423,6 +504,10 @@ Status retry_rpc(std::string_view op_name, const Request& 
req, Response* res,
         } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
             return Status::Error<ErrorCode::INVALID_ARGUMENT, false>("failed 
to {}: {}", op_name,
                                                                      
res->status().msg());
+        } else if (res->status().code() == MetaServiceCode::MAX_QPS_LIMIT) {
+            // TODO(bobhan1): change to correct error code
+            rate_limit_ctx.backpressure_handler->on_ms_busy();

Review Comment:
   **Bug (Critical): Null pointer dereference**
   
   `rate_limit_ctx.backpressure_handler` can be `nullptr`. The 
`RpcRateLimitCtx` struct defaults `backpressure_handler` to `nullptr`, and 
`get_cluster_status` (line ~2516) passes `{.host_limiters = 
host_level_ms_rpc_rate_limiters_}` without setting `.backpressure_handler`. If 
MS returns `MAX_QPS_LIMIT` for that RPC, this will crash.
   
   Both `apply_rate_limit()` (line 418) and `record_rpc_qps()` (line 446) 
correctly null-check `ctx.backpressure_handler` before use. The same guard is 
needed here.
   
   Suggested fix:
   ```cpp
   } else if (res->status().code() == MetaServiceCode::MAX_QPS_LIMIT) {
       if (rate_limit_ctx.backpressure_handler) {
           rate_limit_ctx.backpressure_handler->on_ms_busy();
       }
       // MS_BUSY should also be retried
   ```



##########
be/src/cloud/cloud_meta_mgr.cpp:
##########
@@ -631,6 +727,12 @@ Status 
CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
             return Status::NotFound("failed to get rowset meta: {}, {}", 
resp.status().msg(),
                                     tablet_info);
         }
+        if (resp.status().code() == MetaServiceCode::MAX_QPS_LIMIT) {
+            // TODO(bobhan1): change to correct error code
+            ms_backpressure_handler_->on_ms_busy();

Review Comment:
   **Bug (Critical): Null pointer dereference + Infinite loop**
   
   Two issues here:
   
   **1. Null deref:** `ms_backpressure_handler_` is initialized to `nullptr` 
(see `cloud_meta_mgr.h`) and only set via `set_ms_backpressure_handler()`. If 
this setter was never called, or called with null, a `MAX_QPS_LIMIT` response 
will crash. Line 698 correctly null-checks `host_level_ms_rpc_rate_limiters_` 
in the same function — the backpressure handler should get the same treatment.
   
   **2. Infinite busy-loop:** The `continue` on line 734 skips incrementing 
`tried` and has no backoff/sleep. If MS keeps returning `MAX_QPS_LIMIT`, this 
is an unbounded busy-loop consuming 100% CPU. Contrast with `retry_rpc()` which 
increments `retry_times` and applies exponential backoff.
   
   Suggested fix:
   ```cpp
   if (resp.status().code() == MetaServiceCode::MAX_QPS_LIMIT) {
       if (ms_backpressure_handler_) {
           ms_backpressure_handler_->on_ms_busy();
       }
       ++tried; // Count against retry limit
       // Apply backoff similar to retry_rpc
       bthread_usleep(std::min(1000 * (1 << tried), 60000) * 1000ULL);
       continue;
   }
   ```



##########
be/src/cloud/cloud_ms_backpressure_handler.cpp:
##########
@@ -0,0 +1,517 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_ms_backpressure_handler.h"
+
+#include <fmt/format.h>
+#include <glog/logging.h>
+
+#include <algorithm>
+#include <cmath>
+#include <queue>
+#include <thread>
+
+#include "cloud/config.h"
+#include "common/status.h"
+#include "util/thread.h"
+
+namespace doris::cloud {
+
+// Global bvar metrics
+bvar::Adder<uint64_t> 
g_backpressure_upgrade_count("ms_rpc_backpressure_upgrade_count");
+bvar::Window<bvar::Adder<uint64_t>> 
g_backpressure_upgrade_60s("ms_rpc_backpressure_upgrade_60s",
+                                                               
&g_backpressure_upgrade_count, 60);
+bvar::Adder<uint64_t> 
g_backpressure_downgrade_count("ms_rpc_backpressure_downgrade_count");
+bvar::Window<bvar::Adder<uint64_t>> g_backpressure_downgrade_60s(
+        "ms_rpc_backpressure_downgrade_60s", &g_backpressure_downgrade_count, 
60);
+bvar::LatencyRecorder g_throttle_wait_prepare_rowset(
+        "ms_rpc_backpressure_throttle_wait_prepare_rowset");
+bvar::LatencyRecorder g_throttle_wait_commit_rowset(
+        "ms_rpc_backpressure_throttle_wait_commit_rowset");
+bvar::LatencyRecorder g_throttle_wait_update_tmp_rowset(
+        "ms_rpc_backpressure_throttle_wait_update_tmp_rowset");
+bvar::LatencyRecorder g_throttle_wait_update_packed_file_info(
+        "ms_rpc_backpressure_throttle_wait_update_packed_file_info");
+bvar::LatencyRecorder g_throttle_wait_update_delete_bitmap(
+        "ms_rpc_backpressure_throttle_wait_update_delete_bitmap");
+bvar::Adder<uint64_t> g_ms_busy_count("ms_rpc_backpressure_ms_busy_count");
+bvar::Window<bvar::Adder<uint64_t>> 
g_ms_busy_60s("ms_rpc_backpressure_ms_busy_60s",
+                                                  &g_ms_busy_count, 60);
+
+static bvar::LatencyRecorder* s_throttle_wait_recorders[] = {
+        &g_throttle_wait_prepare_rowset,       &g_throttle_wait_commit_rowset,
+        &g_throttle_wait_update_tmp_rowset,    
&g_throttle_wait_update_packed_file_info,
+        &g_throttle_wait_update_delete_bitmap,
+};
+
+bvar::LatencyRecorder* get_throttle_wait_recorder(LoadRelatedRpc rpc) {
+    size_t idx = static_cast<size_t>(rpc);
+    if (idx >= static_cast<size_t>(LoadRelatedRpc::COUNT)) {
+        return nullptr;
+    }
+    return s_throttle_wait_recorders[idx];
+}
+
+// ============== StrictQpsLimiter ==============
+
+StrictQpsLimiter::StrictQpsLimiter(double qps) {
+    if (qps <= 0) {
+        qps = 1.0;
+    }
+    _interval_ns = static_cast<int64_t>(1e9 / qps);
+    _next_allowed_time = Clock::now();
+}
+
+StrictQpsLimiter::Clock::time_point StrictQpsLimiter::reserve() {
+    std::lock_guard lock(_mtx);
+    auto now = Clock::now();
+    if (_next_allowed_time <= now) {
+        _next_allowed_time = now + std::chrono::nanoseconds(_interval_ns);
+        return now;
+    }
+    auto result = _next_allowed_time;
+    _next_allowed_time += std::chrono::nanoseconds(_interval_ns);
+    return result;
+}
+
+void StrictQpsLimiter::update_qps(double new_qps) {
+    if (new_qps <= 0) {
+        new_qps = 1.0;
+    }
+    std::lock_guard lock(_mtx);
+    _interval_ns = static_cast<int64_t>(1e9 / new_qps);
+}
+
+double StrictQpsLimiter::get_qps() const {
+    std::lock_guard lock(_mtx);
+    if (_interval_ns <= 0) {
+        return 0;
+    }
+    return 1e9 / _interval_ns;
+}
+
+// ============== TableRpcQpsCounter ==============
+
+TableRpcQpsCounter::TableRpcQpsCounter(int64_t table_id, LoadRelatedRpc 
rpc_type, int window_sec)
+        : _table_id(table_id), _rpc_type(rpc_type) {
+    _counter = std::make_unique<bvar::Adder<int64_t>>();
+    _counter->hide();
+    _qps = 
std::make_unique<bvar::PerSecond<bvar::Adder<int64_t>>>(_counter.get(), 
window_sec);
+    _qps->hide();
+}
+
+void TableRpcQpsCounter::increment() {
+    (*_counter) << 1;
+}
+
+double TableRpcQpsCounter::get_qps() const {
+    return _qps->get_value();
+}
+
+// ============== TableRpcQpsRegistry ==============
+
+TableRpcQpsRegistry::TableRpcQpsRegistry() = default;
+
+void TableRpcQpsRegistry::record(LoadRelatedRpc rpc_type, int64_t table_id) {
+    auto* counter = get_or_create_counter(rpc_type, table_id);
+    if (counter) {
+        counter->increment();
+    }
+}
+
+TableRpcQpsCounter* TableRpcQpsRegistry::get_or_create_counter(LoadRelatedRpc 
rpc_type,
+                                                               int64_t 
table_id) {
+    size_t idx = static_cast<size_t>(rpc_type);
+    if (idx >= static_cast<size_t>(LoadRelatedRpc::COUNT)) {
+        return nullptr;
+    }
+
+    {
+        std::shared_lock lock(_mutex);
+        auto it = _counters[idx].find(table_id);
+        if (it != _counters[idx].end()) {
+            return it->second.get();
+        }
+    }
+
+    std::unique_lock lock(_mutex);
+    // Double check after acquiring exclusive lock
+    auto it = _counters[idx].find(table_id);
+    if (it != _counters[idx].end()) {
+        return it->second.get();
+    }
+
+    auto counter = std::make_unique<TableRpcQpsCounter>(table_id, rpc_type,
+                                                        
config::ms_rpc_table_qps_window_sec);
+    auto* ptr = counter.get();
+    _counters[idx][table_id] = std::move(counter);
+    return ptr;
+}
+
+std::vector<std::pair<int64_t, double>> TableRpcQpsRegistry::get_top_k_tables(
+        LoadRelatedRpc rpc_type, int k) const {
+    size_t idx = static_cast<size_t>(rpc_type);
+    if (idx >= static_cast<size_t>(LoadRelatedRpc::COUNT) || k <= 0) {
+        return {};
+    }
+
+    // Use a min-heap of size k to find top-k without allocating a vector for 
all tables.
+    // The heap top is the smallest among the k largest elements seen so far.
+    using Entry = std::pair<int64_t, double>; // (table_id, qps)
+    auto min_cmp = [](const Entry& a, const Entry& b) { return a.second > 
b.second; };
+    std::priority_queue<Entry, std::vector<Entry>, decltype(min_cmp)> 
min_heap(min_cmp);
+
+    {
+        std::shared_lock lock(_mutex);
+        for (const auto& [table_id, counter] : _counters[idx]) {
+            double qps = counter->get_qps();
+            if (qps > 0) {
+                if (static_cast<int>(min_heap.size()) < k) {
+                    min_heap.push({table_id, qps});
+                } else if (qps > min_heap.top().second) {
+                    min_heap.pop();
+                    min_heap.push({table_id, qps});
+                }
+            }
+        }
+    }
+
+    // Extract results from heap (comes out in ascending order, reverse to 
descending)
+    std::vector<Entry> result;
+    result.reserve(min_heap.size());
+    while (!min_heap.empty()) {
+        result.push_back(min_heap.top());
+        min_heap.pop();
+    }
+    std::reverse(result.begin(), result.end());
+
+    return result;
+}
+
+double TableRpcQpsRegistry::get_qps(LoadRelatedRpc rpc_type, int64_t table_id) 
const {
+    size_t idx = static_cast<size_t>(rpc_type);
+    if (idx >= static_cast<size_t>(LoadRelatedRpc::COUNT)) {
+        return 0;
+    }
+
+    std::shared_lock lock(_mutex);
+    auto it = _counters[idx].find(table_id);
+    if (it != _counters[idx].end()) {
+        return it->second->get_qps();
+    }
+    return 0;
+}
+
+void TableRpcQpsRegistry::cleanup_inactive_tables() {
+    std::unique_lock lock(_mutex);
+
+    for (size_t idx = 0; idx < static_cast<size_t>(LoadRelatedRpc::COUNT); 
++idx) {
+        auto& counter_map = _counters[idx];
+        for (auto it = counter_map.begin(); it != counter_map.end();) {
+            // Remove counters with zero QPS for a long time
+            if (it->second->get_qps() < 0.01) {
+                it = counter_map.erase(it);
+            } else {
+                ++it;
+            }
+        }
+    }
+}
+
+// ============== TableRpcThrottler ==============
+
+TableRpcThrottler::TableRpcThrottler() {
+    // Initialize bvar for throttled table counts
+    for (size_t i = 0; i < static_cast<size_t>(LoadRelatedRpc::COUNT); ++i) {
+        std::string bvar_name = 
fmt::format("ms_rpc_backpressure_throttled_tables_{}",
+                                            
load_related_rpc_name(static_cast<LoadRelatedRpc>(i)));
+        _throttled_table_counts[i] = 
std::make_unique<bvar::Status<size_t>>(bvar_name, 0);
+    }
+}
+
+std::chrono::steady_clock::time_point 
TableRpcThrottler::throttle(LoadRelatedRpc rpc_type,
+                                                                  int64_t 
table_id) {
+    std::shared_lock lock(_mutex);
+    auto it = _limiters.find({rpc_type, table_id});
+    if (it == _limiters.end()) {
+        return std::chrono::steady_clock::now();
+    }
+    return it->second->reserve();
+}
+
+void TableRpcThrottler::set_qps_limit(LoadRelatedRpc rpc_type, int64_t 
table_id, double qps_limit) {
+    if (qps_limit <= 0) {
+        return;
+    }
+
+    std::unique_lock lock(_mutex);
+    auto key = std::make_pair(rpc_type, table_id);
+    auto it = _limiters.find(key);
+    if (it != _limiters.end()) {
+        it->second->update_qps(qps_limit);
+    } else {
+        _limiters[key] = std::make_unique<StrictQpsLimiter>(qps_limit);
+        // Update bvar count
+        size_t idx = static_cast<size_t>(rpc_type);
+        if (idx < static_cast<size_t>(LoadRelatedRpc::COUNT)) {
+            size_t count = 0;
+            for (const auto& [k, _] : _limiters) {
+                if (k.first == rpc_type) {
+                    ++count;
+                }
+            }
+            _throttled_table_counts[idx]->set_value(count);
+        }
+    }
+
+    LOG(INFO) << "[ms-throttle] set table QPS limit: rpc=" << 
load_related_rpc_name(rpc_type)
+              << ", table_id=" << table_id << ", qps_limit=" << qps_limit;
+}
+
+void TableRpcThrottler::remove_qps_limit(LoadRelatedRpc rpc_type, int64_t 
table_id) {
+    std::unique_lock lock(_mutex);
+    auto key = std::make_pair(rpc_type, table_id);
+    auto it = _limiters.find(key);
+    if (it != _limiters.end()) {
+        _limiters.erase(it);
+        // Update bvar count
+        size_t idx = static_cast<size_t>(rpc_type);
+        if (idx < static_cast<size_t>(LoadRelatedRpc::COUNT)) {
+            size_t count = 0;
+            for (const auto& [k, _] : _limiters) {
+                if (k.first == rpc_type) {
+                    ++count;
+                }
+            }
+            _throttled_table_counts[idx]->set_value(count);
+        }
+
+        LOG(INFO) << "[ms-throttle] removed table QPS limit: rpc="
+                  << load_related_rpc_name(rpc_type) << ", table_id=" << 
table_id;
+    }
+}
+
+double TableRpcThrottler::get_qps_limit(LoadRelatedRpc rpc_type, int64_t 
table_id) const {
+    std::shared_lock lock(_mutex);
+    auto it = _limiters.find({rpc_type, table_id});
+    if (it != _limiters.end()) {
+        return it->second->get_qps();
+    }
+    return 0;
+}
+
+bool TableRpcThrottler::has_limit(LoadRelatedRpc rpc_type, int64_t table_id) 
const {
+    std::shared_lock lock(_mutex);
+    return _limiters.find({rpc_type, table_id}) != _limiters.end();
+}
+
+size_t TableRpcThrottler::get_throttled_table_count(LoadRelatedRpc rpc_type) 
const {
+    size_t idx = static_cast<size_t>(rpc_type);
+    if (idx >= static_cast<size_t>(LoadRelatedRpc::COUNT)) {
+        return 0;
+    }
+    return _throttled_table_counts[idx]->get_value();
+}
+
+std::vector<TableRpcThrottler::ThrottleEntry> 
TableRpcThrottler::get_all_throttled_entries() const {
+    std::shared_lock lock(_mutex);
+    std::vector<ThrottleEntry> entries;
+    entries.reserve(_limiters.size());
+    for (const auto& [key, limiter] : _limiters) {
+        entries.push_back({key.first, key.second, limiter->get_qps()});
+    }
+    return entries;
+}
+
+// ============== MSBackpressureHandler ==============
+
+MSBackpressureHandler::MSBackpressureHandler(TableRpcQpsRegistry* qps_registry,
+                                             TableRpcThrottler* throttler)
+        : _qps_registry(qps_registry),
+          _throttler(throttler),
+          _stop_latch(1),
+          _last_ms_busy_time(std::chrono::steady_clock::time_point::min()) {
+    // Initialize state machine with config values
+    RpcThrottleParams throttle_params {
+            .top_k = config::ms_backpressure_upgrade_top_k,
+            .ratio = config::ms_backpressure_throttle_ratio,
+            .floor_qps = config::ms_rpc_table_qps_limit_floor,
+    };
+    _state_machine = 
std::make_unique<RpcThrottleStateMachine>(throttle_params);
+
+    // Initialize coordinator with config values
+    // Coordinator uses ticks where 1 tick = 1 millisecond (fixed unit)
+    // This allows tick_interval_ms to change at runtime without affecting 
correctness
+    ThrottleCoordinatorParams coordinator_params {
+            .upgrade_cooldown_ticks = 
config::ms_backpressure_upgrade_interval_ms,
+            .downgrade_after_ticks = 
config::ms_backpressure_downgrade_interval_ms,
+    };
+    _coordinator = 
std::make_unique<RpcThrottleCoordinator>(coordinator_params);
+
+    auto st = Thread::create(
+            "MSBackpressureHandler", "tick_thread", [this]() { 
this->_tick_thread_callback(); },
+            &_tick_thread);
+    if (!st.ok()) {
+        LOG(WARNING) << "[ms-throttle] failed to create tick thread: " << st;
+    } else {
+        LOG(INFO) << "[ms-throttle] handler started: upgrade_cooldown="
+                  << config::ms_backpressure_upgrade_interval_ms
+                  << "ms, downgrade_interval=" << 
config::ms_backpressure_downgrade_interval_ms
+                  << "ms";
+    }
+}
+
+MSBackpressureHandler::~MSBackpressureHandler() {
+    _stop_latch.count_down();
+    if (_tick_thread) {
+        _tick_thread->join();
+    }
+}
+
+void MSBackpressureHandler::_tick_thread_callback() {
+    // Fixed tick interval: 1 second. Since 1 tick = 1 ms, advance by 1000 
ticks each iteration.
+    constexpr int kTickIntervalMs = 1000;
+    while (!_stop_latch.wait_for(std::chrono::milliseconds(kTickIntervalMs))) {
+        _advance_time(kTickIntervalMs);
+    }
+}
+
+void MSBackpressureHandler::_advance_time(int ticks) {
+    if (!config::enable_ms_backpressure_handling) {
+        return;
+    }
+
+    // Advance coordinator time; if downgrade is triggered, handle it
+    if (_coordinator->tick(ticks)) {
+        LOG(INFO) << "[ms-throttle] triggering downgrade, upgrade_level="
+                  << _state_machine->upgrade_level();
+
+        auto actions = _state_machine->on_downgrade();
+        _apply_actions(actions);
+        _coordinator->set_has_pending_upgrades(_state_machine->upgrade_level() 
> 0);
+
+        g_backpressure_downgrade_count << 1;
+    }
+}
+
+bool MSBackpressureHandler::on_ms_busy() {

Review Comment:
   **Concurrency concern (Medium): TOCTOU race between `on_ms_busy()` and 
`_advance_time()`**
   
   Both `on_ms_busy()` (called from RPC threads) and `_advance_time()` (called 
from the tick thread) perform a multi-step sequence: coordinator decision → 
state machine mutation → `_apply_actions()` → coordinator feedback 
(`set_has_pending_upgrades`). Each individual call to `_coordinator` and 
`_state_machine` is internally synchronized, but the **sequence is not atomic**.
   
   Possible interleaving:
   1. Thread A (`on_ms_busy`): `_coordinator->report_ms_busy()` → returns 
`true` (upgrade)
   2. Thread B (`_advance_time`): `_coordinator->tick()` → returns `true` 
(downgrade)
   3. Thread B: `_state_machine->on_downgrade()` — may undo a previous upgrade
   4. Thread A: `_state_machine->on_upgrade(snapshot)` — applies upgrade
   5. Both set `_coordinator->set_has_pending_upgrades()` with potentially 
stale state
   
   The system is self-correcting (next `MS_BUSY` re-triggers), so this is **not 
a crash bug**, but can cause momentary incorrect throttle limits. Consider 
wrapping the entire upgrade/downgrade sequence in a handler-level mutex.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to