This is an automated email from the ASF dual-hosted git repository.
mrhhsg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d23d9ca4d03 [fix](be) Restore workload group limit refresh after
memory recovery (#62070)
d23d9ca4d03 is described below
commit d23d9ca4d03ff357cd4da9e850304ad2cc0ea173
Author: Jerry Hu <[email protected]>
AuthorDate: Fri Apr 10 12:38:47 2026 +0800
[fix](be) Restore workload group limit refresh after memory recovery
(#62070)
### What problem does this PR solve?
Problem Summary:
1. Refresh_workload_group_memory_state() returned early when aggregate
workload-group memory usage was zero. That skipped
update_queries_limit_(), so per-query memory limits could stay
artificially low after memory pressure had already eased.
2. Paused workload queries were not handled correctly around recent
query cancellation. The manager could return too early, fail to clear
resumed or expired paused entries, and skip restoring effective query
memory limits under recovery. This change fixes those branches and adds
unit tests that directly cover the recent-cancel and Phase 3 cleanup
paths.
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
Co-authored-by: Copilot <[email protected]>
---
.../workload_group/workload_group_manager.cpp | 103 ++--
.../workload_group/workload_group_manager_test.cpp | 516 +++++++++++++++++++++
2 files changed, 593 insertions(+), 26 deletions(-)
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index d869862440b..3d5893b9a06 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -231,16 +231,14 @@ void
WorkloadGroupMgr::refresh_workload_group_memory_state() {
for (auto& [wg_id, wg] : _workload_groups) {
all_workload_groups_mem_usage += wg->refresh_memory_usage();
}
- if (all_workload_groups_mem_usage <= 0) {
- return;
+ if (all_workload_groups_mem_usage > 0) {
+ std::string debug_msg = fmt::format(
+ "\nProcess Memory Summary: {}, {}, all workload groups memory
usage: {}",
+
doris::GlobalMemoryArbitrator::process_memory_used_details_str(),
+ doris::GlobalMemoryArbitrator::sys_mem_available_details_str(),
+ PrettyPrinter::print(all_workload_groups_mem_usage,
TUnit::BYTES));
+ LOG_EVERY_T(INFO, 60) << debug_msg;
}
-
- std::string debug_msg =
- fmt::format("\nProcess Memory Summary: {}, {}, all workload groups
memory usage: {}",
-
doris::GlobalMemoryArbitrator::process_memory_used_details_str(),
-
doris::GlobalMemoryArbitrator::sys_mem_available_details_str(),
- PrettyPrinter::print(all_workload_groups_mem_usage,
TUnit::BYTES));
- LOG_EVERY_T(INFO, 60) << debug_msg;
for (auto& wg : _workload_groups) {
update_queries_limit_(wg.second, false);
}
@@ -316,8 +314,8 @@ void WorkloadGroupMgr::add_paused_query(const
std::shared_ptr<ResourceContext>&
void WorkloadGroupMgr::handle_paused_queries() {
{
std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+ std::unique_lock<std::mutex> lock(_paused_queries_lock);
for (auto& [wg_id, wg] : _workload_groups) {
- std::unique_lock<std::mutex> lock(_paused_queries_lock);
if (_paused_queries_list[wg].empty()) {
// Add an empty set to wg that not contains paused queries.
}
@@ -325,6 +323,8 @@ void WorkloadGroupMgr::handle_paused_queries() {
}
std::unique_lock<std::mutex> lock(_paused_queries_lock);
+ bool has_recently_cancelled_query = false;
+ std::set<WorkloadGroupPtr> wgs_with_recently_cancelled;
for (auto it = _paused_queries_list.begin(); it !=
_paused_queries_list.end();) {
auto& queries_list = it->second;
for (auto query_it = queries_list.begin(); query_it !=
queries_list.end();) {
@@ -335,12 +335,23 @@ void WorkloadGroupMgr::handle_paused_queries() {
query_it = queries_list.erase(query_it);
continue;
}
- // If there are any tasks that is cancelled and canceled time is
less than 15 seconds, just break.
- // because it may release memory and other tasks may not be
cancelled and spill disk.
- if (resource_ctx->task_controller()->is_cancelled() &&
- resource_ctx->task_controller()->cancel_elapsed_millis() <
- config::wait_cancel_release_memory_ms) {
- return;
+ if (resource_ctx->task_controller()->is_cancelled()) {
+ if (resource_ctx->task_controller()->cancel_elapsed_millis() <
+ config::wait_cancel_release_memory_ms) {
+ // Recently cancelled — may still be releasing memory.
Record it so that
+ // PROCESS_MEMORY_EXCEEDED queries wait globally and
+ // WORKLOAD_GROUP_MEMORY_EXCEEDED queries wait within the
same WG.
+ has_recently_cancelled_query = true;
+ wgs_with_recently_cancelled.insert(it->first);
+ ++query_it;
+ } else {
+ // Cancelled long ago — no longer releasing memory. Remove
it now to
+ // avoid unnecessary processing in Phase 4.
+ LOG(INFO) << "Query: " << query_it->query_id()
+ << " was cancelled and has exceeded wait time,
erase it.";
+ query_it = queries_list.erase(query_it);
+ }
+ continue;
}
++query_it;
}
@@ -355,22 +366,40 @@ void WorkloadGroupMgr::handle_paused_queries() {
// In previous loop, some query is cancelled, and now there is no query in
cancel list. Resume all paused queries.
if (revoking_memory_from_other_query_) {
- for (auto it = _paused_queries_list.begin(); it !=
_paused_queries_list.end(); ++it) {
+ if (has_recently_cancelled_query) {
+ // Still waiting for the cancelled query to release memory.
+ return;
+ }
+ for (auto it = _paused_queries_list.begin(); it !=
_paused_queries_list.end();) {
auto& queries_list = it->second;
- for (auto query_it = queries_list.begin(); query_it !=
queries_list.end(); ++query_it) {
+ for (auto query_it = queries_list.begin(); query_it !=
queries_list.end();) {
auto resource_ctx = query_it->resource_ctx_.lock();
// The query is finished during in paused list.
if (resource_ctx == nullptr) {
LOG(INFO) << "Query: " << query_it->query_id() << " is
nullptr, erase it.";
+ query_it = queries_list.erase(query_it);
+ continue;
+ }
+ if (resource_ctx->task_controller()->is_cancelled()) {
+ LOG(INFO) << "Query: " << query_it->query_id()
+ << " is already cancelled, erase it from paused
list.";
+ query_it = queries_list.erase(query_it);
continue;
}
LOG(INFO) << "Query " <<
print_id(resource_ctx->task_controller()->task_id())
<< " is blocked due to process memory not enough,
but already "
- "cancelled some queries, resumt it now.";
+ "cancelled some queries, resume it now.";
resource_ctx->task_controller()->set_memory_sufficient(true);
+ query_it = queries_list.erase(query_it);
+ }
+ if (queries_list.empty()) {
+ it = _paused_queries_list.erase(it);
+ } else {
+ ++it;
}
}
revoking_memory_from_other_query_ = false;
+ return;
}
for (auto it = _paused_queries_list.begin(); it !=
_paused_queries_list.end();) {
@@ -399,6 +428,14 @@ void WorkloadGroupMgr::handle_paused_queries() {
continue;
}
+ // Recently cancelled queries are kept in the list by Phase 2 for
+ // delay-waiting tracking. Skip them here — they must not be
processed
+ // (spilled/cancelled/resumed) by Phase 4.
+ if (resource_ctx->task_controller()->is_cancelled()) {
+ ++query_it;
+ continue;
+ }
+
if (resource_ctx->task_controller()
->paused_reason()
.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
@@ -424,6 +461,12 @@ void WorkloadGroupMgr::handle_paused_queries() {
} else if (resource_ctx->task_controller()
->paused_reason()
.is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) {
+ // For WORKLOAD_GROUP_MEMORY_EXCEEDED: only wait if a
cancelled query in the
+ // SAME WG is releasing memory (WG memory pools are
independent across WGs).
+ if (wgs_with_recently_cancelled.contains(wg)) {
+ ++query_it;
+ continue;
+ }
// here query is paused because of
WORKLOAD_GROUP_MEMORY_EXCEEDED,
// wg of the current query may not actually exceed the limit,
// just (wg consumption + current query expected reserve
memory > wg memory limit)
@@ -460,8 +503,8 @@ void WorkloadGroupMgr::handle_paused_queries() {
continue;
}
- // when running here, current query adjusted_mem_limit < query
memory consumption + reserve_size,
- // which means that the current query itself has not exceeded
the memory limit.
+ // when running here, current query adjusted_mem_limit >=
query memory consumption + reserve_size,
+ // which means that the current query itself has not exceeded
the adjusted memory limit.
//
// this means that there must be queries in the wg of the
current query whose memory exceeds
// adjusted_mem_limit, but these queries may not have entered
the paused state,
@@ -536,6 +579,12 @@ void WorkloadGroupMgr::handle_paused_queries() {
}
}
} else {
+ // PROCESS_MEMORY_EXCEEDED: a recently cancelled query
anywhere may be releasing
+ // process-level memory, so wait globally before spilling or
revoking.
+ if (has_recently_cancelled_query) {
+ ++query_it;
+ continue;
+ }
// If workload group's memory usage > min memory, then it
means the workload group use too much memory
// in memory contention state. Should just spill
if (wg->total_mem_used() > wg->min_memory_limit()) {
@@ -865,11 +914,13 @@ void
WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha
// If the query is a pure load task, then should not modify its limit.
Or it will reserve
// memory failed and we did not hanle it.
if (!resource_ctx->task_controller()->is_pure_load_task()) {
- // If user's set mem limit is less than query weighted mem limit,
then should not modify its limit.
- // Use user settings.
- if (resource_ctx->memory_context()->user_set_mem_limit() >
query_weighted_mem_limit) {
-
resource_ctx->memory_context()->set_mem_limit(query_weighted_mem_limit);
- }
+ // The effective limit should be min(user_set_mem_limit,
query_weighted_mem_limit).
+ // This ensures limits are both lowered under memory pressure and
restored when
+ // pressure eases (e.g., when concurrent queries finish or WG
memory drops below
+ // low watermark).
+ int64_t effective_limit =
std::min(resource_ctx->memory_context()->user_set_mem_limit(),
+ query_weighted_mem_limit);
+ resource_ctx->memory_context()->set_mem_limit(effective_limit);
resource_ctx->memory_context()->set_adjusted_mem_limit(
expected_query_weighted_mem_limit);
}
diff --git a/be/test/runtime/workload_group/workload_group_manager_test.cpp
b/be/test/runtime/workload_group/workload_group_manager_test.cpp
index 2d3543d609d..a061ab719f0 100644
--- a/be/test/runtime/workload_group/workload_group_manager_test.cpp
+++ b/be/test/runtime/workload_group/workload_group_manager_test.cpp
@@ -39,6 +39,7 @@
#include "runtime/runtime_query_statistics_mgr.h"
#include "runtime/workload_group/workload_group.h"
#include "storage/olap_define.h"
+#include "testutil/mock/mock_query_task_controller.h"
namespace doris {
@@ -118,6 +119,15 @@ private:
return query_context;
}
+ MockQueryTaskController* _install_mock_query_task_controller(
+ const std::shared_ptr<QueryContext>& query_context) {
+ query_context->resource_ctx()->set_task_controller(
+
MockQueryTaskController::create(static_cast<QueryTaskController*>(
+ query_context->resource_ctx()->task_controller())));
+ return static_cast<MockQueryTaskController*>(
+ query_context->resource_ctx()->task_controller());
+ }
+
void _run_checking_loop(const std::shared_ptr<WorkloadGroup>& wg) {
CountDownLatch latch(1);
size_t check_times = 300;
@@ -536,4 +546,510 @@ TEST_F(WorkloadGroupManagerTest, ProcessMemoryNotEnough) {
EXPECT_EQ(wg3->total_mem_used(), 0); // WG3 exceed 400MB
}
+// Test Fix 1 (Phase 3): When revoking_memory_from_other_query_ is true and
cancelled queries
+// have finished, Phase 3 should resume all paused queries AND remove them
from the list,
+// then return without entering Phase 4 (which would re-process them).
+TEST_F(WorkloadGroupManagerTest, phase3_resume_removes_from_list_and_returns) {
+ auto wg = _wg_manager->get_or_create_workload_group({});
+ auto query_context1 = _generate_on_query(wg);
+ auto query_context2 = _generate_on_query(wg);
+
+ // Pause two queries due to process memory exceeded
+ _wg_manager->add_paused_query(query_context1->resource_ctx(), 1024L,
+
Status::Error(ErrorCode::PROCESS_MEMORY_EXCEEDED, "test"));
+ _wg_manager->add_paused_query(query_context2->resource_ctx(), 1024L,
+
Status::Error(ErrorCode::PROCESS_MEMORY_EXCEEDED, "test"));
+
+ {
+ std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
+ ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 2);
+ }
+
+ // Simulate: a previous call had cancelled a query and set revoking flag
+ _wg_manager->revoking_memory_from_other_query_ = true;
+
+ // Call handle_paused_queries — Phase 2 finds no cancelled query in the
list,
+ // Phase 3 should resume all and remove them from the list.
+ _wg_manager->handle_paused_queries();
+
+ {
+ std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
+ // All queries should be removed from paused list by Phase 3
+ ASSERT_TRUE(!_wg_manager->_paused_queries_list.contains(wg) ||
+ _wg_manager->_paused_queries_list[wg].empty())
+ << "Phase 3 should remove all resumed queries from paused
list";
+ }
+ ASSERT_FALSE(_wg_manager->revoking_memory_from_other_query_) << "revoking
flag should be reset";
+ // Queries should NOT be cancelled (Phase 4 should not have run)
+ ASSERT_FALSE(query_context1->is_cancelled()) << "query1 should be resumed,
not cancelled";
+ ASSERT_FALSE(query_context2->is_cancelled()) << "query2 should be resumed,
not cancelled";
+}
+
+TEST_F(WorkloadGroupManagerTest, phase3_waits_for_recently_cancelled_query) {
+ auto wg = _wg_manager->get_or_create_workload_group({});
+ auto cancelled_query = _generate_on_query(wg);
+ auto waiting_query = _generate_on_query(wg);
+ auto* mock_controller =
_install_mock_query_task_controller(cancelled_query);
+
+ _wg_manager->add_paused_query(cancelled_query->resource_ctx(), 1024L,
+
Status::Error(ErrorCode::PROCESS_MEMORY_EXCEEDED, "test"));
+ _wg_manager->add_paused_query(waiting_query->resource_ctx(), 1024L,
+
Status::Error(ErrorCode::PROCESS_MEMORY_EXCEEDED, "test"));
+
+ cancelled_query->resource_ctx()->task_controller()->cancel(
+ Status::InternalError("memory gc cancel"));
+ _wg_manager->revoking_memory_from_other_query_ = true;
+
+ _wg_manager->handle_paused_queries();
+
+ {
+ std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
+ ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 2);
+ }
+ ASSERT_TRUE(_wg_manager->revoking_memory_from_other_query_);
+ ASSERT_TRUE(waiting_query->resource_ctx()
+ ->task_controller()
+ ->paused_reason()
+ .is<ErrorCode::PROCESS_MEMORY_EXCEEDED>());
+
+ mock_controller->set_cancelled_time(MonotonicMillis() -
config::wait_cancel_release_memory_ms -
+ 1);
+ _wg_manager->handle_paused_queries();
+
+ {
+ std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
+ ASSERT_TRUE(!_wg_manager->_paused_queries_list.contains(wg) ||
+ _wg_manager->_paused_queries_list[wg].empty());
+ }
+ ASSERT_FALSE(_wg_manager->revoking_memory_from_other_query_);
+
ASSERT_TRUE(waiting_query->resource_ctx()->task_controller()->paused_reason().ok());
+ ASSERT_FALSE(waiting_query->is_cancelled());
+}
+
+TEST_F(WorkloadGroupManagerTest, phase3_removes_expired_query_entries) {
+ auto wg = _wg_manager->get_or_create_workload_group({});
+ auto live_query = _generate_on_query(wg);
+ auto expired_query = _generate_on_query(wg);
+
+ _wg_manager->add_paused_query(live_query->resource_ctx(), 1024L,
+
Status::Error(ErrorCode::PROCESS_MEMORY_EXCEEDED, "test"));
+ _wg_manager->add_paused_query(expired_query->resource_ctx(), 1024L,
+
Status::Error(ErrorCode::PROCESS_MEMORY_EXCEEDED, "test"));
+
+ {
+ std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
+ ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 2);
+ }
+
+ expired_query.reset();
+ _wg_manager->revoking_memory_from_other_query_ = true;
+ _wg_manager->handle_paused_queries();
+
+ {
+ std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
+ ASSERT_TRUE(!_wg_manager->_paused_queries_list.contains(wg) ||
+ _wg_manager->_paused_queries_list[wg].empty());
+ }
+ ASSERT_FALSE(_wg_manager->revoking_memory_from_other_query_);
+
ASSERT_TRUE(live_query->resource_ctx()->task_controller()->paused_reason().ok());
+ ASSERT_FALSE(live_query->is_cancelled());
+}
+
+// Test Fix 2 (Problem 3): A cancelled query in one WG should NOT block
+// QUERY_MEMORY_EXCEEDED queries in another WG from being processed.
+TEST_F(WorkloadGroupManagerTest,
cancelled_query_does_not_block_query_mem_exceeded) {
+ WorkloadGroupInfo wg1_info {.id = 1, .memory_limit = 1024L * 1024 * 1000};
+ WorkloadGroupInfo wg2_info {.id = 2, .memory_limit = 1024L * 1024 * 1000};
+ auto wg1 = _wg_manager->get_or_create_workload_group(wg1_info);
+ auto wg2 = _wg_manager->get_or_create_workload_group(wg2_info);
+
+ // WG1: a query that will be externally cancelled (simulating memory GC)
+ auto cancelled_query = _generate_on_query(wg1);
+ cancelled_query->query_mem_tracker()->consume(1024L * 1024 * 10);
+ _wg_manager->add_paused_query(cancelled_query->resource_ctx(), 1024L,
+
Status::Error(ErrorCode::PROCESS_MEMORY_EXCEEDED, "test"));
+
+ // Cancel the query externally (like memory GC would) — it stays in paused
list
+ cancelled_query->resource_ctx()->task_controller()->cancel(
+ Status::InternalError("memory gc cancel"));
+
+ // WG2: a query paused due to QUERY_MEMORY_EXCEEDED — should be processed
immediately
+ auto query_exceed = _generate_on_query(wg2);
+ query_exceed->resource_ctx()->memory_context()->set_mem_limit(1024 * 1024);
+ query_exceed->query_mem_tracker()->consume(1024 * 4);
+ _wg_manager->add_paused_query(query_exceed->resource_ctx(), 1024L * 1024 *
1024,
+
Status::Error(ErrorCode::QUERY_MEMORY_EXCEEDED, "test"));
+
+ // Verify both in paused list
+ {
+ std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
+ ASSERT_EQ(_wg_manager->_paused_queries_list[wg1].size(), 1);
+ ASSERT_EQ(_wg_manager->_paused_queries_list[wg2].size(), 1);
+ }
+
+ // One call to handle_paused_queries — the QUERY_MEMORY_EXCEEDED query
should be processed
+ // even though there's a recently cancelled query in wg1.
+ _wg_manager->handle_paused_queries();
+
+ {
+ std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
+ // WG2's query should have been processed (removed from paused list)
+ ASSERT_TRUE(!_wg_manager->_paused_queries_list.contains(wg2) ||
+ _wg_manager->_paused_queries_list[wg2].empty())
+ << "QUERY_MEMORY_EXCEEDED query should not be blocked by
cancelled query in "
+ "another WG";
+ }
+
+ query_exceed->query_mem_tracker()->consume(-1024 * 4);
+ cancelled_query->query_mem_tracker()->consume(-1024L * 1024 * 10);
+}
+
+TEST_F(WorkloadGroupManagerTest,
recently_cancelled_query_delays_process_mem_exceeded) {
+ WorkloadGroupInfo wg1_info {.id = 1,
+ .memory_limit = 1024L * 1024 * 1000,
+ .min_memory_percent = 10,
+ .max_memory_percent = 100};
+ WorkloadGroupInfo wg2_info {.id = 2,
+ .memory_limit = 1024L * 1024 * 1000,
+ .min_memory_percent = 10,
+ .max_memory_percent = 100};
+ auto wg1 = _wg_manager->get_or_create_workload_group(wg1_info);
+ auto wg2 = _wg_manager->get_or_create_workload_group(wg2_info);
+
+ auto cancelled_query = _generate_on_query(wg1);
+ _wg_manager->add_paused_query(cancelled_query->resource_ctx(), 1024L,
+
Status::Error(ErrorCode::PROCESS_MEMORY_EXCEEDED, "test"));
+ cancelled_query->resource_ctx()->task_controller()->cancel(
+ Status::InternalError("memory gc cancel"));
+
+ auto waiting_query = _generate_on_query(wg2);
+ waiting_query->query_mem_tracker()->consume(1024L * 1024 * 128);
+ wg2->refresh_memory_usage();
+ _wg_manager->add_paused_query(waiting_query->resource_ctx(), 1024L,
+
Status::Error(ErrorCode::PROCESS_MEMORY_EXCEEDED, "test"));
+
+ _wg_manager->handle_paused_queries();
+
+ {
+ std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
+ ASSERT_EQ(_wg_manager->_paused_queries_list[wg2].size(), 1);
+ }
+ ASSERT_TRUE(waiting_query->resource_ctx()
+ ->task_controller()
+ ->paused_reason()
+ .is<ErrorCode::PROCESS_MEMORY_EXCEEDED>());
+
+ cancelled_query.reset();
+ _wg_manager->handle_paused_queries();
+
+ {
+ std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
+ ASSERT_TRUE(!_wg_manager->_paused_queries_list.contains(wg2) ||
+ _wg_manager->_paused_queries_list[wg2].empty());
+ }
+
ASSERT_TRUE(waiting_query->resource_ctx()->task_controller()->paused_reason().ok());
+ ASSERT_FALSE(waiting_query->is_cancelled());
+ waiting_query->query_mem_tracker()->consume(-1024L * 1024 * 128);
+}
+
+// Test Fix 3: update_queries_limit_ should restore mem_limit when memory
pressure eases.
+// For NONE policy, the old code never called set_mem_limit during refresh
(user_set > user_set
+// is always false), so a limit lowered by handle_paused_queries would never
recover.
+TEST_F(WorkloadGroupManagerTest,
update_queries_limit_restores_limit_none_policy) {
+ WorkloadGroupInfo wg_info {.id = 1,
+ .memory_limit = 1024L * 1024 * 200,
+ .slot_mem_policy = TWgSlotMemoryPolicy::NONE};
+ auto wg = _wg_manager->get_or_create_workload_group(wg_info);
+ auto query_context = _generate_on_query(wg);
+
+ // user_set_mem_limit is set in QueryContext init =
query_options.mem_limit = 128MB
+ const int64_t user_set =
query_context->resource_ctx()->memory_context()->user_set_mem_limit();
+ ASSERT_EQ(user_set, 1024L * 1024 * 128);
+
+ // Simulate handle_paused_queries lowering the limit to a small value
+ query_context->resource_ctx()->memory_context()->set_mem_limit(1024L *
1024 * 2); // 2MB
+ ASSERT_EQ(query_context->resource_ctx()->memory_context()->mem_limit(),
1024L * 1024 * 2);
+
+ // Now simulate memory recovery: WG memory is well below watermark
+ // refresh_workload_group_memory_state calls update_queries_limit_(wg,
false)
+ wg->refresh_memory_usage();
+ _wg_manager->refresh_workload_group_memory_state();
+
+ // The limit should be restored to user_set_mem_limit (128MB),
+ // because query_weighted = min(user_set, wg_mem_limit) = min(128MB,
200MB) = 128MB
+ // effective = min(user_set, query_weighted) = 128MB
+ ASSERT_EQ(query_context->resource_ctx()->memory_context()->mem_limit(),
user_set)
+ << "NONE policy: mem_limit should be restored to
user_set_mem_limit after memory "
+ "recovery";
+}
+
+// Test Fix 3: For DYNAMIC policy, when memory pressure eases (below low
watermark),
+// query_weighted_mem_limit = wg_high_water_mark which is typically >
user_set_mem_limit.
+// The old code's condition (user_set > query_weighted) would be false,
preventing restoration.
+TEST_F(WorkloadGroupManagerTest,
update_queries_limit_restores_limit_dynamic_policy) {
+ WorkloadGroupInfo wg_info {.id = 1,
+ .memory_limit = 1024L * 1024 * 200,
+ .memory_low_watermark = 80,
+ .memory_high_watermark = 95,
+ .total_query_slot_count = 5,
+ .slot_mem_policy =
TWgSlotMemoryPolicy::DYNAMIC};
+ auto wg = _wg_manager->get_or_create_workload_group(wg_info);
+ auto query_context = _generate_on_query(wg);
+
+ const int64_t user_set =
query_context->resource_ctx()->memory_context()->user_set_mem_limit();
+ ASSERT_EQ(user_set, 1024L * 1024 * 128);
+
+ // Simulate: under memory pressure, limit was lowered by
handle_paused_queries
+ query_context->resource_ctx()->memory_context()->set_mem_limit(1024L *
1024 * 2); // 2MB
+ ASSERT_EQ(query_context->resource_ctx()->memory_context()->mem_limit(),
1024L * 1024 * 2);
+
+ // Memory recovers: no consumption, well below low watermark
+ wg->refresh_memory_usage();
+ _wg_manager->refresh_workload_group_memory_state();
+
+ // DYNAMIC: below low watermark → query_weighted = wg_high_water_mark =
200MB * 95% = 190MB
+ // effective = min(user_set=128MB, 190MB) = 128MB
+ ASSERT_EQ(query_context->resource_ctx()->memory_context()->mem_limit(),
user_set)
+ << "DYNAMIC policy: mem_limit should be restored to
user_set_mem_limit after memory "
+ "recovery";
+}
+
+// Test Fix 3: For FIXED policy, limit should be correctly set to
slot-weighted value.
+// This already worked before the fix, but verify it still works.
+TEST_F(WorkloadGroupManagerTest,
update_queries_limit_restores_limit_fixed_policy) {
+ WorkloadGroupInfo wg_info {.id = 1,
+ .memory_limit = 1024L * 1024 * 200,
+ .memory_low_watermark = 80,
+ .memory_high_watermark = 95,
+ .total_query_slot_count = 5,
+ .slot_mem_policy = TWgSlotMemoryPolicy::FIXED};
+ auto wg = _wg_manager->get_or_create_workload_group(wg_info);
+ auto query_context = _generate_on_query(wg);
+
+ // Simulate lowered limit
+ query_context->resource_ctx()->memory_context()->set_mem_limit(1024L *
1024 * 2); // 2MB
+
+ wg->refresh_memory_usage();
+ _wg_manager->refresh_workload_group_memory_state();
+
+ // FIXED: query_weighted = wg_high_water_mark * my_slot / total_slot
+ // = 200MB * 95% * 1 / 5 = 38MB
+ // effective = min(user_set=128MB, 38MB) = 38MB
+ const int64_t expected = (int64_t)((double)(1024L * 1024 * 200) * 95.0 /
100 / 5);
+ const auto delta =
+
std::abs(query_context->resource_ctx()->memory_context()->mem_limit() -
expected);
+ ASSERT_LE(delta, 1) << "FIXED policy: mem_limit should be restored to
slot-weighted value, got "
+ <<
query_context->resource_ctx()->memory_context()->mem_limit()
+ << " expected " << expected;
+}
+
+// Test: When WG concurrency decreases (queries finish), remaining queries
should get
+// higher per-query limits in FIXED policy.
+TEST_F(WorkloadGroupManagerTest, limit_increases_when_concurrency_decreases) {
+ WorkloadGroupInfo wg_info {.id = 1,
+ .memory_limit = 1024L * 1024 * 200,
+ .memory_low_watermark = 80,
+ .memory_high_watermark = 95,
+ .total_query_slot_count = 5,
+ .slot_mem_policy = TWgSlotMemoryPolicy::FIXED};
+ auto wg = _wg_manager->get_or_create_workload_group(wg_info);
+
+ // Start 3 queries (each with slot_count = 1, total_used_slot = 3)
+ auto q1 = _generate_on_query(wg);
+ auto q2 = _generate_on_query(wg);
+ auto q3 = _generate_on_query(wg);
+
+ wg->refresh_memory_usage();
+ _wg_manager->refresh_workload_group_memory_state();
+
+ // FIXED: wg_high_water_mark * 1 / 5 = 200MB * 95% / 5 = 38MB
+ // (total_slot_count is configured as 5, not actual used slots for FIXED)
+ int64_t limit_with_3_queries =
q1->resource_ctx()->memory_context()->mem_limit();
+
+ // Now q2 and q3 finish — remove from WG
+ q2.reset();
+ q3.reset();
+ wg->clear_cancelled_resource_ctx();
+ wg->refresh_memory_usage();
+ _wg_manager->refresh_workload_group_memory_state();
+
+ int64_t limit_with_1_query =
q1->resource_ctx()->memory_context()->mem_limit();
+
+ // For FIXED with total_query_slot_count=5, the slot-weighted limit
doesn't change
+ // when queries leave (it's based on configured total, not actual).
+ // But the limit should at least be correctly restored, not stuck at a low
value.
+ ASSERT_EQ(limit_with_1_query, limit_with_3_queries)
+ << "FIXED policy with configured total_slot_count: limit should
remain stable";
+}
+
+// Test: Cancelled queries that have exceeded wait_cancel_release_memory_ms
should be
+// cleaned up in Phase 2, not left in the list for Phase 4 processing.
+TEST_F(WorkloadGroupManagerTest, phase2_removes_old_cancelled_queries) {
+ auto wg = _wg_manager->get_or_create_workload_group({});
+ auto cancelled_query = _generate_on_query(wg);
+ auto live_query = _generate_on_query(wg);
+ auto* mock_controller =
_install_mock_query_task_controller(cancelled_query);
+
+ // Pause both queries
+ _wg_manager->add_paused_query(cancelled_query->resource_ctx(), 1024L,
+
Status::Error(ErrorCode::PROCESS_MEMORY_EXCEEDED, "test"));
+ _wg_manager->add_paused_query(live_query->resource_ctx(), 1024L,
+
Status::Error(ErrorCode::PROCESS_MEMORY_EXCEEDED, "test"));
+
+ // Cancel the query and make it look old (exceeded wait time)
+ cancelled_query->resource_ctx()->task_controller()->cancel(
+ Status::InternalError("memory gc cancel"));
+ mock_controller->set_cancelled_time(MonotonicMillis() -
config::wait_cancel_release_memory_ms -
+ 1);
+
+ {
+ std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
+ ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 2);
+ }
+
+ // One call — Phase 2 should remove the old-cancelled query.
+ _wg_manager->handle_paused_queries();
+
+ {
+ std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
+ // The old-cancelled query should have been removed in Phase 2.
+ // The live query may still be in the list (Phase 4 doesn't always
process it in
+ // one call, depending on WG memory state), but the count should be at
most 1.
+ size_t remaining = _wg_manager->_paused_queries_list.contains(wg)
+ ?
_wg_manager->_paused_queries_list[wg].size()
+ : 0;
+ ASSERT_LE(remaining, 1) << "Old-cancelled query should have been
removed in Phase 2, "
+ "at most the live query remains";
+ }
+ ASSERT_FALSE(live_query->is_cancelled()) << "live query should not be
cancelled";
+}
+
+// Test: Phase 3 should not call set_memory_sufficient on cancelled queries —
+// just erase them and only resume live queries.
+TEST_F(WorkloadGroupManagerTest, phase3_skips_cancelled_queries_on_resume) {
+ auto wg = _wg_manager->get_or_create_workload_group({});
+ auto cancelled_query = _generate_on_query(wg);
+ auto live_query = _generate_on_query(wg);
+ auto* mock_controller =
_install_mock_query_task_controller(cancelled_query);
+
+ _wg_manager->add_paused_query(cancelled_query->resource_ctx(), 1024L,
+
Status::Error(ErrorCode::PROCESS_MEMORY_EXCEEDED, "test"));
+ _wg_manager->add_paused_query(live_query->resource_ctx(), 1024L,
+
Status::Error(ErrorCode::PROCESS_MEMORY_EXCEEDED, "test"));
+
+ // Cancel the query but make it recently cancelled so Phase 2 keeps it
+ cancelled_query->resource_ctx()->task_controller()->cancel(
+ Status::InternalError("memory gc cancel"));
+ _wg_manager->revoking_memory_from_other_query_ = true;
+
+ // First call: Phase 3 waits because recently cancelled
+ _wg_manager->handle_paused_queries();
+ ASSERT_TRUE(_wg_manager->revoking_memory_from_other_query_);
+
+ // Make cancellation old
+ mock_controller->set_cancelled_time(MonotonicMillis() -
config::wait_cancel_release_memory_ms -
+ 1);
+
+ // Second call: Phase 2 removes old-cancelled query, Phase 3 resumes live
query
+ _wg_manager->handle_paused_queries();
+
+ {
+ std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
+ ASSERT_TRUE(!_wg_manager->_paused_queries_list.contains(wg) ||
+ _wg_manager->_paused_queries_list[wg].empty())
+ << "All queries should be removed";
+ }
+ ASSERT_FALSE(_wg_manager->revoking_memory_from_other_query_);
+ // Live query should be properly resumed
+
ASSERT_TRUE(live_query->resource_ctx()->task_controller()->paused_reason().ok());
+ ASSERT_FALSE(live_query->is_cancelled());
+}
+
+// Test: A cancelled query in WG1 should NOT block
WORKLOAD_GROUP_MEMORY_EXCEEDED
+// queries in WG2, because WG memory pools are independent. Only same-WG
queries
+// should be delayed. PROCESS_MEMORY_EXCEEDED should still be delayed globally.
+TEST_F(WorkloadGroupManagerTest,
cancelled_query_does_not_block_cross_wg_mem_exceeded) {
+ WorkloadGroupInfo wg1_info {.id = 1, .memory_limit = 1024L * 1024 * 1000};
+ WorkloadGroupInfo wg2_info {.id = 2, .memory_limit = 1024L * 1024 * 1000};
+ auto wg1 = _wg_manager->get_or_create_workload_group(wg1_info);
+ auto wg2 = _wg_manager->get_or_create_workload_group(wg2_info);
+
+ // WG1: a recently cancelled query
+ auto cancelled_query = _generate_on_query(wg1);
+ cancelled_query->query_mem_tracker()->consume(1024L * 1024 * 10);
+ _wg_manager->add_paused_query(cancelled_query->resource_ctx(), 1024L,
+
Status::Error(ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED, "test"));
+ cancelled_query->resource_ctx()->task_controller()->cancel(
+ Status::InternalError("memory gc cancel"));
+
+ // WG2: a query paused due to WORKLOAD_GROUP_MEMORY_EXCEEDED — should NOT
be
+ // blocked by WG1's cancelled query since WG memory pools are independent.
+ auto wg2_query = _generate_on_query(wg2);
+ wg2_query->query_mem_tracker()->consume(1024L * 1024 * 4);
+ wg2_query->resource_ctx()->memory_context()->set_adjusted_mem_limit(1024L
* 1024 * 10);
+ _wg_manager->add_paused_query(wg2_query->resource_ctx(), 1024L,
+
Status::Error(ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED, "test"));
+
+ {
+ std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
+ ASSERT_EQ(_wg_manager->_paused_queries_list[wg1].size(), 1);
+ ASSERT_EQ(_wg_manager->_paused_queries_list[wg2].size(), 1);
+ }
+
+ _wg_manager->handle_paused_queries();
+
+ {
+ std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
+ // WG2's query should have been processed (not blocked by WG1's
cancellation)
+ ASSERT_TRUE(!_wg_manager->_paused_queries_list.contains(wg2) ||
+ _wg_manager->_paused_queries_list[wg2].empty())
+ << "Cross-WG WORKLOAD_GROUP_MEMORY_EXCEEDED should not be
blocked by "
+ "cancelled query in another WG";
+ }
+
+ wg2_query->query_mem_tracker()->consume(-1024L * 1024 * 4);
+ cancelled_query->query_mem_tracker()->consume(-1024L * 1024 * 10);
+}
+
+// Test: A recently-cancelled QUERY_MEMORY_EXCEEDED query should NOT be
processed
+// by handle_single_query_() in Phase 4, and should NOT incorrectly set
+// revoking_memory_from_other_query_. Phase 2 keeps it for delay-waiting;
+// Phase 4 must skip it.
+TEST_F(WorkloadGroupManagerTest, phase4_skips_cancelled_query_memory_exceeded)
{
+ auto wg = _wg_manager->get_or_create_workload_group({});
+ auto cancelled_query = _generate_on_query(wg);
+ auto live_query = _generate_on_query(wg);
+
+ // Pause both for QUERY_MEMORY_EXCEEDED
+ cancelled_query->resource_ctx()->memory_context()->set_mem_limit(1024 *
1024);
+ cancelled_query->query_mem_tracker()->consume(1024 * 4);
+ _wg_manager->add_paused_query(cancelled_query->resource_ctx(), 1024L *
1024 * 1024,
+
Status::Error(ErrorCode::QUERY_MEMORY_EXCEEDED, "test"));
+
+ live_query->resource_ctx()->memory_context()->set_mem_limit(1024 * 1024);
+ live_query->query_mem_tracker()->consume(1024 * 4);
+ _wg_manager->add_paused_query(live_query->resource_ctx(), 1024L * 1024 *
1024,
+
Status::Error(ErrorCode::QUERY_MEMORY_EXCEEDED, "test"));
+
+ // Cancel one query externally (simulating memory GC) — it's recently
cancelled
+ cancelled_query->resource_ctx()->task_controller()->cancel(
+ Status::InternalError("memory gc cancel"));
+
+ {
+ std::unique_lock<std::mutex> lock(_wg_manager->_paused_queries_lock);
+ ASSERT_EQ(_wg_manager->_paused_queries_list[wg].size(), 2);
+ }
+
+ _wg_manager->handle_paused_queries();
+
+ // The cancelled query should NOT have caused
revoking_memory_from_other_query_ to be set.
+ // If Phase 4 incorrectly processed it through handle_single_query_(), the
is_cancelled()
+ // check afterward would set this flag.
+ ASSERT_FALSE(_wg_manager->revoking_memory_from_other_query_)
+ << "revoking flag should NOT be set by an already-cancelled query";
+
+ cancelled_query->query_mem_tracker()->consume(-1024 * 4);
+ live_query->query_mem_tracker()->consume(-1024 * 4);
+}
+
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]