This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 63d936f32fa branch-4.0: [enhancement](spilldisk)Cancel query fast when 
reserver memory failed and could not find revocable tasks #59330 (#59440)
63d936f32fa is described below

commit 63d936f32fa6adaaa0d98d222b17d17138266233
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Dec 30 14:09:37 2025 +0800

    branch-4.0: [enhancement](spilldisk)Cancel query fast when reserver memory 
failed and could not find revocable tasks #59330 (#59440)
    
    Cherry-picked from #59330
    
    Co-authored-by: yiguolei <[email protected]>
---
 be/src/pipeline/pipeline_task.cpp                  |  34 +-
 be/src/runtime/memory/mem_tracker_limiter.h        |   3 +-
 .../workload_group/workload_group_manager.cpp      | 388 +++++++++------------
 .../workload_management/query_task_controller.cpp  |   9 +
 .../workload_management/query_task_controller.h    |   1 +
 .../runtime/workload_management/task_controller.h  |   2 +-
 be/test/pipeline/pipeline_task_test.cpp            | 200 ++++++++++-
 be/test/pipeline/thrift_builder.h                  |   5 +
 .../workload_group/workload_group_manager_test.cpp |   4 +-
 gensrc/thrift/PaloInternalService.thrift           |   2 +-
 10 files changed, 405 insertions(+), 243 deletions(-)

diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index bf787f05105..dbe52cbabc0 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -643,6 +643,14 @@ Status PipelineTask::do_revoke_memory(const 
std::shared_ptr<SpillContext>& spill
 
 bool PipelineTask::_try_to_reserve_memory(const size_t reserve_size, 
OperatorBase* op) {
     auto st = 
thread_context()->thread_mem_tracker_mgr->try_reserve(reserve_size);
+    // If reserve memory failed and the query is not enable spill, just 
disable reserve memory(this will enable
+    // memory hard limit check, and will cancel the query if allocate memory 
failed) and let it run.
+    if (!st.ok() && !_state->enable_spill()) {
+        LOG(INFO) << print_id(_query_id) << " reserve memory failed due to " 
<< st
+                  << ", and it is not enable spill, disable reserve memory and 
let it run";
+        
_state->get_query_ctx()->resource_ctx()->task_controller()->disable_reserve_memory();
+        return true;
+    }
     COUNTER_UPDATE(_memory_reserve_times, 1);
     auto sink_revocable_mem_size = _sink->revocable_mem_size(_state);
     if (st.ok() && _state->enable_force_spill() && _sink->is_spillable() &&
@@ -659,13 +667,30 @@ bool PipelineTask::_try_to_reserve_memory(const size_t 
reserve_size, OperatorBas
                 op->node_id(), _state->task_id(),
                 PrettyPrinter::print_bytes(op->revocable_mem_size(_state)),
                 PrettyPrinter::print_bytes(sink_revocable_mem_size), 
st.to_string());
-        // PROCESS_MEMORY_EXCEEDED error msg alread contains 
process_mem_log_str
+        // PROCESS_MEMORY_EXCEEDED error msg already contains 
process_mem_log_str
         if (!st.is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) {
             debug_msg +=
                     fmt::format(", debug info: {}", 
GlobalMemoryArbitrator::process_mem_log_str());
         }
-        LOG_EVERY_N(INFO, 100) << debug_msg;
         // If sink has enough revocable memory, trigger revoke memory
+        LOG(INFO) << fmt::format(
+                "Query: {} sink: {}, node id: {}, task id: "
+                "{}, revocable mem size: {}",
+                print_id(_query_id), _sink->get_name(), _sink->node_id(), 
_state->task_id(),
+                PrettyPrinter::print_bytes(sink_revocable_mem_size));
+        ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
+                _state->get_query_ctx()->resource_ctx()->shared_from_this(), 
reserve_size, st);
+        _spilling = true;
+        return false;
+        // !!! Attention:
+        // In the past, if reserve failed, not add this query to paused list, 
because it is very small, will not
+        // consume a lot of memory. But need set low memory mode to indicate 
that the system should
+        // not use too much memory.
+        // But if we only set _state->get_query_ctx()->set_low_memory_mode() 
here, and return true, the query will
+        // continue to run and not blocked, and this reserve maybe the last 
block of join sink opertorator, and it will
+        // build hash table directly and will consume a lot of memory. So that 
should return false directly.
+        // TODO: we should using a global system buffer management logic to 
deal with low memory mode.
+        /**
         if (sink_revocable_mem_size >= 
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
             LOG(INFO) << fmt::format(
                     "Query: {} sink: {}, node id: {}, task id: "
@@ -677,11 +702,8 @@ bool PipelineTask::_try_to_reserve_memory(const size_t 
reserve_size, OperatorBas
             _spilling = true;
             return false;
         } else {
-            // If reserve failed, not add this query to paused list, because 
it is very small, will not
-            // consume a lot of memory. But need set low memory mode to 
indicate that the system should
-            // not use too much memory.
             _state->get_query_ctx()->set_low_memory_mode();
-        }
+        } */
     }
     return true;
 }
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h 
b/be/src/runtime/memory/mem_tracker_limiter.h
index da9285e5255..fd0418f4826 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -126,7 +126,6 @@ public:
     int64_t group_num() const { return _group_num; }
     int64_t limit() const { return _limit; }
     void set_limit(int64_t new_mem_limit) { _limit = new_mem_limit; }
-    bool enable_check_limit() const { return _enable_check_limit; }
     void set_enable_check_limit(bool enable_check_limit) {
         _enable_check_limit = enable_check_limit;
     }
@@ -298,7 +297,7 @@ inline void MemTrackerLimiter::cache_consume(int64_t bytes) 
{
 }
 
 inline Status MemTrackerLimiter::check_limit(int64_t bytes) {
-    if (bytes <= 0 || !enable_check_limit() || _limit <= 0) {
+    if (bytes <= 0 || !_enable_check_limit || _limit <= 0) {
         return Status::OK();
     }
 
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 5b2fd2efd6c..f671b7a6e14 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -346,14 +346,33 @@ void WorkloadGroupMgr::handle_paused_queries() {
         }
     }
 
-    bool has_query_exceed_process_memlimit = false;
+    // In previous loop, some query is cancelled, and now there is no query in 
cancel list. Resume all paused queries.
+    if (revoking_memory_from_other_query_) {
+        for (auto it = _paused_queries_list.begin(); it != 
_paused_queries_list.end(); ++it) {
+            auto& queries_list = it->second;
+            for (auto query_it = queries_list.begin(); query_it != 
queries_list.end(); ++query_it) {
+                auto resource_ctx = query_it->resource_ctx_.lock();
+                // The query is finished during in paused list.
+                if (resource_ctx == nullptr) {
+                    LOG(INFO) << "Query: " << query_it->query_id() << " is 
nullptr, erase it.";
+                    continue;
+                }
+                LOG(INFO) << "Query " << 
print_id(resource_ctx->task_controller()->task_id())
+                          << " is blocked due to process memory not enough, 
but already "
+                             "cancelled some queries, resumt it now.";
+                resource_ctx->task_controller()->set_memory_sufficient(true);
+            }
+        }
+        revoking_memory_from_other_query_ = false;
+    }
+
     for (auto it = _paused_queries_list.begin(); it != 
_paused_queries_list.end();) {
         auto& queries_list = it->second;
         auto query_count = queries_list.size();
         const auto& wg = it->first;
 
         if (query_count != 0) {
-            LOG_EVERY_T(INFO, 1) << "Paused queries count of wg " << 
wg->name() << ": "
+            LOG_EVERY_T(INFO, 1) << "Paused queries count of workload group " 
<< wg->name() << ": "
                                  << query_count;
         }
 
@@ -388,6 +407,11 @@ void WorkloadGroupMgr::handle_paused_queries() {
                     VLOG_DEBUG << "Query: " << 
print_id(resource_ctx->task_controller()->task_id())
                                << " remove from paused list";
                     query_it = queries_list.erase(query_it);
+                    // The query is cancelled, just break. And wait for the 
query to release the memory. Other query maybe not need spill.
+                    if (resource_ctx->task_controller()->is_cancelled()) {
+                        revoking_memory_from_other_query_ = true;
+                        return;
+                    }
                     continue;
                 }
             } else if (resource_ctx->task_controller()
@@ -469,6 +493,7 @@ void WorkloadGroupMgr::handle_paused_queries() {
                     bool spill_res = handle_single_query_(
                             resource_ctx, query_it->reserve_size_, 
query_it->elapsed_time(),
                             resource_ctx->task_controller()->paused_reason());
+
                     if (!spill_res) {
                         ++query_it;
                         continue;
@@ -477,6 +502,11 @@ void WorkloadGroupMgr::handle_paused_queries() {
                                 << "Query: " << 
print_id(resource_ctx->task_controller()->task_id())
                                 << " remove from paused list";
                         query_it = queries_list.erase(query_it);
+                        // The query is cancelled, just break. And wait for 
the query to release the memory. Other query maybe not need spill.
+                        if (resource_ctx->task_controller()->is_cancelled()) {
+                            revoking_memory_from_other_query_ = true;
+                            return;
+                        }
                         continue;
                     }
                 } else {
@@ -499,105 +529,41 @@ void WorkloadGroupMgr::handle_paused_queries() {
                     }
                 }
             } else {
-                if (revoking_memory_from_other_query_) {
-                    // Previously, we have revoked memory from other query, 
and the cancel stage finished.
-                    // So, resume all queries now.
-                    
resource_ctx->task_controller()->set_memory_sufficient(true);
-                    VLOG_DEBUG << "Query " << 
print_id(resource_ctx->task_controller()->task_id())
-                               << " is blocked due to process memory not 
enough, but already "
-                                  "cancelled some queries, resumt it now.";
-                    query_it = queries_list.erase(query_it);
-                    continue;
-                }
-                has_query_exceed_process_memlimit = true;
-                // If wg's memlimit not exceed, but process memory exceed, it 
means cache or other metadata
-                // used too much memory. Should clean all cache here.
-                // Clear all cache not part of cache, because the cache thread 
already try to release cache step
-                // by step. And it is not useful.
-                //
-                // here query is paused because of PROCESS_MEMORY_EXCEEDED,
-                // normally, before process memory exceeds, daemon thread 
`refresh_cache_capacity` will
-                // adjust the cache capacity to 0.
-                // but at this time, process may not actually exceed the limit,
-                // just (process memory + current query expected reserve 
memory > process memory limit)
-                // so the behavior at this time is the same as the process 
memory limit exceed, clear all cache.
-                if 
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted >
-                            0.05 &&
-                    doris::GlobalMemoryArbitrator::
-                                    
last_memory_exceeded_cache_capacity_adjust_weighted > 0.05) {
-                    doris::GlobalMemoryArbitrator::
-                            
last_memory_exceeded_cache_capacity_adjust_weighted = 0.04;
-                    
doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity();
-                    LOG(INFO) << "There are some queries need process memory, 
so that set cache "
-                                 "capacity to 0 now";
-                }
-
-                // `cache_ratio_ < 0.05` means that the cache has been cleared
-                // before the query enters the paused state.
-                // but the query is still paused because of process memory 
exceed,
-                // so here we will try to continue to release other memory.
-                //
-                // need to check config::disable_memory_gc here, if not, when 
config::disable_memory_gc == true,
-                // cache is not adjusted, query_it->cache_ratio_ will always 
be 1, and this if branch will nenver
-                // execute, this query will never be resumed, and will 
deadlock here.
-                if (query_it->cache_ratio_ < 0.05 || 
config::disable_memory_gc) {
-                    // If workload group's memory usage > min memory, then it 
means the workload group use too much memory
-                    // in memory contention state. Should just spill
-                    if (wg->total_mem_used() > wg->min_memory_limit()) {
-                        auto revocable_tasks =
-                                
resource_ctx->task_controller()->get_revocable_tasks();
-                        if (revocable_tasks.empty()) {
-                            Status status = Status::MemoryLimitExceeded(
-                                    "Workload group memory usage {} > min 
memory {}, but no "
-                                    "revocable tasks",
-                                    wg->total_mem_used(), 
wg->min_memory_limit());
-                            
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
-                                    
resource_ctx->task_controller()->task_id(), status);
+                // If workload group's memory usage > min memory, then it 
means the workload group use too much memory
+                // in memory contention state. Should just spill
+                if (wg->total_mem_used() > wg->min_memory_limit()) {
+                    bool spill_res = handle_single_query_(
+                            resource_ctx, query_it->reserve_size_, 
query_it->elapsed_time(),
+                            resource_ctx->task_controller()->paused_reason());
+                    if (!spill_res) {
+                        ++query_it;
+                        continue;
+                    } else {
+                        VLOG_DEBUG
+                                << "Query: " << 
print_id(resource_ctx->task_controller()->task_id())
+                                << " remove from paused list";
+                        query_it = queries_list.erase(query_it);
+                        // The query is cancelled, just break. And wait for 
the query to release the memory. Other query maybe not need spill.
+                        if (resource_ctx->task_controller()->is_cancelled()) {
                             revoking_memory_from_other_query_ = true;
-                            // If any query is cancelled, then skip others 
because it will release many memory and
-                            // other query may not need release memory.
-                            return;
-                        } else {
-                            SCOPED_ATTACH_TASK(resource_ctx);
-                            auto status = 
resource_ctx->task_controller()->revoke_memory();
-                            if (!status.ok()) {
-                                
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
-                                        
resource_ctx->task_controller()->task_id(), status);
-                                revoking_memory_from_other_query_ = true;
-                                return;
-                            }
-                            query_it = queries_list.erase(query_it);
-                            continue;
                         }
-                    }
-
-                    // Other workload groups many use a lot of memory, should 
revoke memory from other workload groups
-                    // by cancelling their queries.
-                    int64_t revoked_size = revoke_memory_from_other_groups_();
-                    if (revoked_size > 0) {
-                        // Revoke memory from other workload groups will 
cancel some queries, wait them cancel finished
-                        // and then check it again.
-                        revoking_memory_from_other_query_ = true;
+                        // If any query is cancelled or spilled to disk, we 
need to stop and not revoke memory from other queries.
                         return;
                     }
-
-                    // TODO revoke from memtable
                 }
-                // `cache_ratio_ > 0.05` means that the cache has not been 
cleared
-                // when the query enters the paused state.
-                // `last_affected_cache_capacity_adjust_weighted < 0.05` means 
that
-                // the cache has been cleared at this time.
-                // this means that the cache has been cleaned after the query 
enters the paused state.
-                // assuming that some memory has been released, wake up the 
query to continue execution.
-                if 
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted <
-                            0.05 &&
-                    query_it->cache_ratio_ > 0.05) {
-                    LOG(INFO) << "Query: " << 
print_id(resource_ctx->task_controller()->task_id())
-                              << " will be resume after cache adjust.";
-                    
resource_ctx->task_controller()->set_memory_sufficient(true);
-                    query_it = queries_list.erase(query_it);
-                    continue;
+
+                // Other workload groups many use a lot of memory, should 
revoke memory from other workload groups
+                // by cancelling their queries.
+                int64_t revoked_size = revoke_memory_from_other_groups_();
+                if (revoked_size > 0) {
+                    // Revoke memory from other workload groups will cancel 
some queries, wait them cancel finished
+                    // and then check it again.
+                    revoking_memory_from_other_query_ = true;
+                    return;
                 }
+
+                // TODO revoke from memtable
+
                 ++query_it;
             }
         }
@@ -612,21 +578,6 @@ void WorkloadGroupMgr::handle_paused_queries() {
             ++it;
         }
     }
-    // Attention: has to be here. It means, no query is at cancelling state 
and all query blocked by process
-    // not enough has been resumed.
-    revoking_memory_from_other_query_ = false;
-
-    if (!has_query_exceed_process_memlimit &&
-        
doris::GlobalMemoryArbitrator::last_memory_exceeded_cache_capacity_adjust_weighted
 < 0.05) {
-        // No query paused due to process exceed limit, so that enable cache 
now.
-        
doris::GlobalMemoryArbitrator::last_memory_exceeded_cache_capacity_adjust_weighted
 =
-                doris::GlobalMemoryArbitrator::
-                        
last_periodic_refreshed_cache_capacity_adjust_weighted.load(
-                                std::memory_order_relaxed);
-        doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity();
-        LOG(INFO) << "No query was paused due to insufficient process memory, 
so that set cache "
-                     "capacity to 
last_periodic_refreshed_cache_capacity_adjust_weighted now";
-    }
 }
 
 // Find the workload group that could revoke lot of memory:
@@ -688,6 +639,9 @@ int64_t 
WorkloadGroupMgr::revoke_memory_from_other_groups_() {
 // If the query could release some memory, for example, spill disk, then the 
return value is true.
 // If the query could not release memory, then cancel the query, the return 
value is true.
 // If the query is not ready to do these tasks, it means just wait, then 
return value is false.
+// Return value:
+// true: the query is spilled or be cancelled. The manager should remove it 
from paused queries list.
+// false: the query is not ready to do these tasks. The manager should 
continue to wait.
 bool WorkloadGroupMgr::handle_single_query_(const 
std::shared_ptr<ResourceContext>& requestor,
                                             size_t size_to_reserve, int64_t 
time_in_queue,
                                             Status paused_reason) {
@@ -705,125 +659,111 @@ bool WorkloadGroupMgr::handle_single_query_(const 
std::shared_ptr<ResourceContex
 
     const auto wg = requestor->workload_group();
     auto revocable_tasks = requestor->task_controller()->get_revocable_tasks();
-    if (revocable_tasks.empty()) {
-        const auto limit = requestor->memory_context()->mem_limit();
-        const auto reserved_size = 
requestor->memory_context()->reserved_consumption();
-        if (paused_reason.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
-            // During waiting time, another operator in the query may finished 
and release
-            // many memory and we could run.
-            if ((memory_usage + size_to_reserve) < limit) {
-                LOG(INFO) << "Query: " << query_id << ", usage("
-                          << PrettyPrinter::print_bytes(memory_usage) << " + " 
<< size_to_reserve
-                          << ") less than limit(" << 
PrettyPrinter::print_bytes(limit)
-                          << "), resume it.";
-                requestor->task_controller()->set_memory_sufficient(true);
-                return true;
-            } else if (time_in_queue >= 
config::spill_in_paused_queue_timeout_ms) {
-                // if cannot find any memory to release, then let the query 
continue to run as far as possible.
-                // after `disable_reserve_memory`, the query will not enter 
the paused state again,
-                // if the memory is really insufficient, Allocator will throw 
an exception
-                // of query memory limit exceed and the query will be canceled,
-                // or it will be canceled by memory gc when the process memory 
exceeds the limit.
-                auto log_str = fmt::format(
-                        "Query {} memory limit is exceeded, but could "
-                        "not find memory that could release or spill to disk, 
disable reserve "
-                        "memory and resume it. Query memory usage: "
-                        "{}, limit: {}, reserved "
-                        "size: {}, try to reserve: {}, wg info: {}. {}",
-                        query_id, PrettyPrinter::print_bytes(memory_usage),
-                        PrettyPrinter::print_bytes(limit),
-                        PrettyPrinter::print_bytes(reserved_size),
-                        PrettyPrinter::print_bytes(size_to_reserve), 
wg->memory_debug_string(),
-                        doris::ProcessProfile::instance()
-                                ->memory_profile()
-                                ->process_memory_detail_str());
-                LOG_LONG_STRING(INFO, log_str);
-                // Disable reserve memory will enable query level memory 
check, if the query
-                // need a lot of memory than the memory limit, it will be 
killed.
-                // Do not need set memlimit = ajusted_mem_limit because 
workload group refresher thread
-                // will update automatically.
-                requestor->task_controller()->disable_reserve_memory();
-                requestor->task_controller()->set_memory_sufficient(true);
-                return true;
-            } else {
-                return false;
-            }
-        } else if 
(paused_reason.is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) {
-            if (!wg->exceed_limit()) {
-                LOG(INFO) << "Query: " << query_id
-                          << " paused caused by 
WORKLOAD_GROUP_MEMORY_EXCEEDED, now resume it.";
-                requestor->task_controller()->set_memory_sufficient(true);
-                return true;
-            } else if (time_in_queue > 
config::spill_in_paused_queue_timeout_ms) {
-                // if cannot find any memory to release, then let the query 
continue to run as far as possible
-                // or cancelled by gc if memory is really not enough.
-                auto log_str = fmt::format(
-                        "Query {} workload group memory is exceeded"
-                        ", and there is no cache now. And could not find task 
to spill, disable "
-                        "reserve memory and resume it. "
-                        "Query memory usage: {}, limit: {}, reserved "
-                        "size: {}, try to reserve: {}, wg info: {}."
-                        " Maybe you should set the workload group's limit to a 
lower value. {}",
-                        query_id, PrettyPrinter::print_bytes(memory_usage),
-                        PrettyPrinter::print_bytes(limit),
-                        PrettyPrinter::print_bytes(reserved_size),
-                        PrettyPrinter::print_bytes(size_to_reserve), 
wg->memory_debug_string(),
-                        doris::ProcessProfile::instance()
-                                ->memory_profile()
-                                ->process_memory_detail_str());
-                LOG_LONG_STRING(INFO, log_str);
-                requestor->task_controller()->disable_reserve_memory();
-                requestor->task_controller()->set_memory_sufficient(true);
-                return true;
-            } else {
-                return false;
-            }
-        } else {
-            // Should not consider about process memory. For example, the 
query's limit is 100g, workload
-            // group's memlimit is 10g, process memory is 20g. The query 
reserve will always failed in wg
-            // limit, and process is always have memory, so that it will 
resume and failed reserve again.
-            const size_t test_memory_size = std::max<size_t>(size_to_reserve, 
32L * 1024 * 1024);
-            if 
(!GlobalMemoryArbitrator::is_exceed_soft_mem_limit(test_memory_size)) {
-                LOG(INFO) << "Query: " << query_id
-                          << ", process limit not exceeded now, resume this 
query"
-                          << ", process memory info: "
-                          << 
GlobalMemoryArbitrator::process_memory_used_details_str()
-                          << ", wg info: " << wg->debug_string();
-                requestor->task_controller()->set_memory_sufficient(true);
-                return true;
-            } else if (time_in_queue > 
config::spill_in_paused_queue_timeout_ms) {
-                // if cannot find any memory to release, then let the query 
continue to run as far as possible
-                // or cancelled by gc if memory is really not enough.
-                auto log_str = fmt::format(
-                        "Query {} process memory is exceeded"
-                        ", and there is no cache now. And could not find task 
to spill, disable "
-                        "reserve memory and resume it. "
-                        "Query memory usage: {}, limit: {}, reserved "
-                        "size: {}, try to reserve: {}, wg info: {}."
-                        " Maybe you should set the workload group's limit to a 
lower value. {}",
-                        query_id, PrettyPrinter::print_bytes(memory_usage),
-                        PrettyPrinter::print_bytes(limit),
-                        PrettyPrinter::print_bytes(reserved_size),
-                        PrettyPrinter::print_bytes(size_to_reserve), 
wg->memory_debug_string(),
-                        doris::ProcessProfile::instance()
-                                ->memory_profile()
-                                ->process_memory_detail_str());
-                LOG_LONG_STRING(INFO, log_str);
-                requestor->task_controller()->disable_reserve_memory();
-                requestor->task_controller()->set_memory_sufficient(true);
-            } else {
-                return false;
-            }
-        }
-    } else {
+    if (!revocable_tasks.empty()) {
         SCOPED_ATTACH_TASK(requestor);
         auto status = requestor->task_controller()->revoke_memory();
         if (!status.ok()) {
-            ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
-                    requestor->task_controller()->task_id(), status);
+            requestor->task_controller()->cancel(status);
+        }
+        return true;
+    }
+    const auto limit = requestor->memory_context()->mem_limit();
+    const auto reserved_size = 
requestor->memory_context()->reserved_consumption();
+    if (paused_reason.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
+        // During waiting time, another operator in the query may finished and 
release
+        // many memory and we could run.
+        if ((memory_usage + size_to_reserve) < limit) {
+            LOG(INFO) << "Query: " << query_id << ", usage("
+                      << PrettyPrinter::print_bytes(memory_usage) << " + " << 
size_to_reserve
+                      << ") less than limit(" << 
PrettyPrinter::print_bytes(limit)
+                      << "), resume it.";
+            requestor->task_controller()->set_memory_sufficient(true);
+            return true;
+        } else {
+            // if cannot find any memory to release, then let the query 
continue to run as far as possible.
+            // after `disable_reserve_memory`, the query will not enter the 
paused state again,
+            // if the memory is really insufficient, Allocator will throw an 
exception
+            // of query memory limit exceed and the query will be canceled,
+            // or it will be canceled by memory gc when the process memory 
exceeds the limit.
+            auto log_str = fmt::format(
+                    "Query {} memory limit is exceeded, but could "
+                    "not find memory that could release or spill to disk, 
disable reserve "
+                    "memory and resume it. Query memory usage: "
+                    "{}, limit: {}, reserved "
+                    "size: {}, try to reserve: {}, wg info: {}. {}",
+                    query_id, PrettyPrinter::print_bytes(memory_usage),
+                    PrettyPrinter::print_bytes(limit), 
PrettyPrinter::print_bytes(reserved_size),
+                    PrettyPrinter::print_bytes(size_to_reserve), 
wg->memory_debug_string(),
+                    doris::ProcessProfile::instance()
+                            ->memory_profile()
+                            ->process_memory_detail_str());
+            LOG_LONG_STRING(INFO, log_str);
+            // Disable reserve memory will enable query level memory check, if 
the query
+            // need a lot of memory than the memory limit, it will be killed.
+            // Do not need set memlimit = ajusted_mem_limit because workload 
group refresher thread
+            // will update automatically.
+            requestor->task_controller()->disable_reserve_memory();
+            requestor->task_controller()->set_memory_sufficient(true);
+            return true;
+        }
+    } else if (paused_reason.is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) {
+        if (!wg->exceed_limit()) {
+            LOG(INFO) << "Query: " << query_id
+                      << " paused caused by WORKLOAD_GROUP_MEMORY_EXCEEDED, 
now resume it.";
+            requestor->task_controller()->set_memory_sufficient(true);
+            return true;
+        } else {
+            Status error_status = Status::MemoryLimitExceeded(
+                    "Query {} workload group memory is exceeded"
+                    ", and there is no cache now. And could not find task to 
spill, "
+                    "try to cancel query. "
+                    "Query memory usage: {}, limit: {}, reserved "
+                    "size: {}, try to reserve: {}, wg info: {}."
+                    " Maybe you should set the workload group's limit to a 
lower value. {}",
+                    query_id, PrettyPrinter::print_bytes(memory_usage),
+                    PrettyPrinter::print_bytes(limit), 
PrettyPrinter::print_bytes(reserved_size),
+                    PrettyPrinter::print_bytes(size_to_reserve), 
wg->memory_debug_string(),
+                    doris::ProcessProfile::instance()
+                            ->memory_profile()
+                            ->process_memory_detail_str());
+            LOG_LONG_STRING(INFO, error_status.to_string());
+            requestor->task_controller()->cancel(error_status);
+            return true;
+        }
+    } else {
+        // Should not consider about process memory. For example, the query's 
limit is 100g, workload
+        // group's memlimit is 10g, process memory is 20g. The query reserve 
will always failed in wg
+        // limit, and process is always have memory, so that it will resume 
and failed reserve again.
+        const size_t test_memory_size = std::max<size_t>(size_to_reserve, 32L 
* 1024 * 1024);
+        if 
(!GlobalMemoryArbitrator::is_exceed_soft_mem_limit(test_memory_size)) {
+            LOG(INFO) << "Query: " << query_id
+                      << ", process limit not exceeded now, resume this query"
+                      << ", process memory info: "
+                      << 
GlobalMemoryArbitrator::process_memory_used_details_str()
+                      << ", wg info: " << wg->debug_string();
+            requestor->task_controller()->set_memory_sufficient(true);
+            return true;
+        } else {
+            // if cannot find any memory to release, then let the query 
continue to run as far as possible
+            // or cancelled by gc if memory is really not enough.
+            Status error_status = Status::MemoryLimitExceeded(
+                    "Query {} process memory is exceeded"
+                    ", and there is no cache now. And could not find task to 
spill, disable "
+                    "reserve memory and resume it. "
+                    "Query memory usage: {}, limit: {}, reserved "
+                    "size: {}, try to reserve: {}, wg info: {}."
+                    " Maybe you should set the workload group's limit to a 
lower value. {}",
+                    query_id, PrettyPrinter::print_bytes(memory_usage),
+                    PrettyPrinter::print_bytes(limit), 
PrettyPrinter::print_bytes(reserved_size),
+                    PrettyPrinter::print_bytes(size_to_reserve), 
wg->memory_debug_string(),
+                    doris::ProcessProfile::instance()
+                            ->memory_profile()
+                            ->process_memory_detail_str());
+            LOG_LONG_STRING(INFO, error_status.to_string());
+            requestor->task_controller()->cancel(error_status);
+            return true;
         }
     }
-    return true;
 }
 
 void WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool 
enable_hard_limit) {
diff --git a/be/src/runtime/workload_management/query_task_controller.cpp 
b/be/src/runtime/workload_management/query_task_controller.cpp
index 43ef520b794..02385c2e4af 100644
--- a/be/src/runtime/workload_management/query_task_controller.cpp
+++ b/be/src/runtime/workload_management/query_task_controller.cpp
@@ -120,6 +120,15 @@ size_t QueryTaskController::get_revocable_size() {
     return revocable_size;
 }
 
+void QueryTaskController::disable_reserve_memory() {
+    TaskController::disable_reserve_memory();
+    auto query_ctx = query_ctx_.lock();
+    if (query_ctx == nullptr) {
+        return;
+    }
+    query_ctx->query_mem_tracker()->set_enable_check_limit(true);
+}
+
 Status QueryTaskController::revoke_memory() {
     auto query_ctx = query_ctx_.lock();
     if (query_ctx == nullptr) {
diff --git a/be/src/runtime/workload_management/query_task_controller.h 
b/be/src/runtime/workload_management/query_task_controller.h
index 64681177f92..c217572138e 100644
--- a/be/src/runtime/workload_management/query_task_controller.h
+++ b/be/src/runtime/workload_management/query_task_controller.h
@@ -37,6 +37,7 @@ public:
     bool cancel_impl(const Status& reason) override { return 
cancel_impl(reason, -1); }
     bool is_pure_load_task() const override;
     int32_t get_slot_count() const override;
+    void disable_reserve_memory() override;
     bool is_enable_reserve_memory() const override;
     void set_memory_sufficient(bool sufficient) override;
     int64_t memory_sufficient_time() override;
diff --git a/be/src/runtime/workload_management/task_controller.h 
b/be/src/runtime/workload_management/task_controller.h
index de3a3db6fef..c1d9fad4587 100644
--- a/be/src/runtime/workload_management/task_controller.h
+++ b/be/src/runtime/workload_management/task_controller.h
@@ -96,7 +96,7 @@ public:
     virtual bool is_pure_load_task() const { return false; }
     void set_low_memory_mode(bool low_memory_mode) { low_memory_mode_ = 
low_memory_mode; }
     bool low_memory_mode() { return low_memory_mode_; }
-    void disable_reserve_memory() { enable_reserve_memory_ = false; }
+    virtual void disable_reserve_memory() { enable_reserve_memory_ = false; }
     virtual bool is_enable_reserve_memory() const { return 
enable_reserve_memory_; }
     virtual void set_memory_sufficient(bool sufficient) {};
     virtual int64_t memory_sufficient_time() { return 0; };
diff --git a/be/test/pipeline/pipeline_task_test.cpp 
b/be/test/pipeline/pipeline_task_test.cpp
index 987faaa13ec..8a8e16e3c4d 100644
--- a/be/test/pipeline/pipeline_task_test.cpp
+++ b/be/test/pipeline/pipeline_task_test.cpp
@@ -750,9 +750,11 @@ TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY) {
         EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
         bool done = false;
         EXPECT_TRUE(task->execute(&done).ok());
-        
EXPECT_TRUE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
-        
EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
-        EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
+        // Not check low memory mode here, because we temporary not use this 
feature, the
+        // system buffer should be checked globally.
+        // 
EXPECT_TRUE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
+        // 
EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
+        // 
EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
         EXPECT_FALSE(task->_eos);
         EXPECT_FALSE(done);
         EXPECT_FALSE(task->_wake_up_early);
@@ -767,9 +769,9 @@ TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY) {
         EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
         bool done = false;
         EXPECT_TRUE(task->execute(&done).ok());
-        
EXPECT_TRUE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
-        
EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
-        EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
+        // 
EXPECT_TRUE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
+        // 
EXPECT_TRUE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
+        // 
EXPECT_TRUE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
         EXPECT_TRUE(task->_eos);
         EXPECT_TRUE(done);
         EXPECT_FALSE(task->_wake_up_early);
@@ -780,6 +782,9 @@ TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY) {
     }
 }
 
+// Test for reserve memory fail for non-spillable task. It will not affect 
anything, the query
+// will continue to run. And will disable reserve memory, so that the query 
will failed when allocated
+// memory > limit.
 TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY_FAIL) {
     {
         _query_options = TQueryOptionsBuilder()
@@ -787,6 +792,7 @@ TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY_FAIL) {
                                  .set_enable_local_shuffle(true)
                                  .set_runtime_filter_max_in_num(15)
                                  .set_enable_reserve_memory(true)
+                                 .set_enable_spill(false)
                                  .build();
         auto fe_address = TNetworkAddress();
         fe_address.hostname = LOCALHOST;
@@ -882,8 +888,184 @@ TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY_FAIL) {
         task->_sink->cast<DummySinkOperatorX>()._revocable_mem_size =
                 vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM + 1;
     }
+    {
+        // Reserve failed and but not enable spill disk, so that the query 
will continue to run.
+        read_dep->set_ready();
+        
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
+        
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
+        EXPECT_FALSE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
+        EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
+        EXPECT_FALSE(task->_spilling);
+        bool done = false;
+        EXPECT_TRUE(task->execute(&done).ok());
+        
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
+        
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
+        EXPECT_FALSE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
+        EXPECT_FALSE(task->_eos);
+        // Not enable spill disk, so that task will not be paused.
+        EXPECT_FALSE(task->_spilling);
+        EXPECT_FALSE(done);
+        EXPECT_FALSE(task->_wake_up_early);
+        EXPECT_TRUE(source_finish_dep->ready());
+        
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->is_enable_reserve_memory());
+        EXPECT_TRUE(source_finish_dep->_blocked_task.empty());
+        EXPECT_FALSE(
+                
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused);
+    }
+    {
+        // Reserve failed .
+        task->_operators.front()->cast<DummyOperator>()._disable_reserve_mem = 
true;
+        task->_spilling = false;
+        task->_operators.front()->cast<DummyOperator>()._eos = true;
+        
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused
 = false;
+        EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
+        bool done = false;
+        EXPECT_TRUE(task->execute(&done).ok());
+        
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
+        
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
+        EXPECT_FALSE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
+        EXPECT_TRUE(task->_eos);
+        EXPECT_FALSE(task->_spilling);
+        EXPECT_TRUE(done);
+        EXPECT_FALSE(task->_wake_up_early);
+        EXPECT_TRUE(source_finish_dep->ready());
+        EXPECT_TRUE(source_finish_dep->_blocked_task.empty());
+        
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->is_enable_reserve_memory());
+        EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
+        EXPECT_FALSE(
+                
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused);
+    }
     {
         // Reserve failed and paused.
+        
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused
 = false;
+        task->_sink->cast<DummySinkOperatorX>()._disable_reserve_mem = true;
+        EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
+        bool done = false;
+        EXPECT_TRUE(task->execute(&done).ok());
+        
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
+        
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
+        EXPECT_FALSE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
+        EXPECT_TRUE(task->_eos);
+        EXPECT_FALSE(task->_spilling);
+        EXPECT_TRUE(done);
+        EXPECT_FALSE(task->_wake_up_early);
+        EXPECT_TRUE(source_finish_dep->ready());
+        
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->is_enable_reserve_memory());
+        EXPECT_TRUE(source_finish_dep->_blocked_task.empty());
+        EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
+        EXPECT_FALSE(
+                
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused);
+    }
+    delete ExecEnv::GetInstance()->_workload_group_manager;
+}
+
+// Test reserve memory fail for spillable pipeline task
+TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY_FAIL_SPILLABLE) {
+    {
+        _query_options = TQueryOptionsBuilder()
+                                 .set_enable_local_exchange(true)
+                                 .set_enable_local_shuffle(true)
+                                 .set_runtime_filter_max_in_num(15)
+                                 .set_enable_reserve_memory(true)
+                                 .set_enable_spill(true)
+                                 .build();
+        auto fe_address = TNetworkAddress();
+        fe_address.hostname = LOCALHOST;
+        fe_address.port = DUMMY_PORT;
+        _query_ctx =
+                QueryContext::create(_query_id, ExecEnv::GetInstance(), 
_query_options, fe_address,
+                                     true, fe_address, 
QuerySource::INTERNAL_FRONTEND);
+        _task_scheduler = std::make_unique<MockTaskScheduler>();
+        _query_ctx->_task_scheduler = _task_scheduler.get();
+        _build_fragment_context();
+
+        TWorkloadGroupInfo twg_info;
+        twg_info.__set_id(0);
+        twg_info.__set_name("_dummpy_workload_group");
+        twg_info.__set_version(0);
+
+        WorkloadGroupInfo workload_group_info = 
WorkloadGroupInfo::parse_topic_info(twg_info);
+
+        ((MockRuntimeState*)_runtime_state.get())->_workload_group =
+                std::make_shared<WorkloadGroup>(workload_group_info);
+        
((MockThreadMemTrackerMgr*)thread_context()->thread_mem_tracker_mgr.get())
+                ->_test_low_memory = true;
+
+        ExecEnv::GetInstance()->_workload_group_manager = new 
MockWorkloadGroupMgr();
+        EXPECT_TRUE(_runtime_state->enable_spill());
+    }
+    auto num_instances = 1;
+    auto pip_id = 0;
+    auto task_id = 0;
+    auto pip = std::make_shared<Pipeline>(pip_id, num_instances, 
num_instances);
+    Dependency* read_dep;
+    Dependency* write_dep;
+    Dependency* source_finish_dep;
+    {
+        OperatorPtr source_op;
+        // 1. create and set the source operator of 
multi_cast_data_stream_source for new pipeline
+        source_op.reset(new DummyOperator());
+        EXPECT_TRUE(pip->add_operator(source_op, num_instances).ok());
+
+        int op_id = 1;
+        int node_id = 2;
+        int dest_id = 3;
+        DataSinkOperatorPtr sink_op;
+        sink_op.reset(new DummySinkOperatorX(op_id, node_id, dest_id));
+        sink_op->_spillable = true;
+        EXPECT_TRUE(pip->set_sink(sink_op).ok());
+    }
+    auto profile = std::make_shared<RuntimeProfile>("Pipeline : " + 
std::to_string(pip_id));
+    std::map<int,
+             std::pair<std::shared_ptr<BasicSharedState>, 
std::vector<std::shared_ptr<Dependency>>>>
+            shared_state_map;
+    _runtime_state->resize_op_id_to_local_state(-1);
+    auto task = std::make_shared<PipelineTask>(pip, task_id, 
_runtime_state.get(), _context,
+                                               profile.get(), 
shared_state_map, task_id);
+    {
+        std::vector<TScanRangeParams> scan_range;
+        int sender_id = 0;
+        TDataSink tsink;
+        EXPECT_TRUE(task->prepare(scan_range, sender_id, tsink).ok());
+        EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
+        EXPECT_GT(task->_execution_dependencies.size(), 1);
+        read_dep = 
_runtime_state->get_local_state_result(task->_operators.front()->operator_id())
+                           .value()
+                           ->dependencies()
+                           .front();
+        write_dep = 
_runtime_state->get_sink_local_state()->dependencies().front();
+    }
+    {
+        _query_ctx->get_execution_dependency()->set_ready();
+        // Task is blocked by read dependency.
+        read_dep->block();
+        EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
+        bool done = false;
+        EXPECT_TRUE(task->execute(&done).ok());
+        EXPECT_FALSE(task->_eos);
+        EXPECT_FALSE(done);
+        EXPECT_FALSE(task->_wake_up_early);
+        EXPECT_FALSE(task->_read_dependencies.empty());
+        EXPECT_FALSE(task->_write_dependencies.empty());
+        EXPECT_FALSE(task->_finish_dependencies.empty());
+        EXPECT_TRUE(task->_opened);
+        EXPECT_FALSE(read_dep->ready());
+        EXPECT_TRUE(write_dep->ready());
+        EXPECT_FALSE(read_dep->_blocked_task.empty());
+        source_finish_dep =
+                
_runtime_state->get_local_state_result(task->_operators.front()->operator_id())
+                        .value()
+                        ->finishdependency();
+        EXPECT_EQ(task->_exec_state, PipelineTask::State::BLOCKED);
+    }
+    {
+        task->_operators.front()->cast<DummyOperator>()._revocable_mem_size =
+                vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM + 1;
+        task->_sink->cast<DummySinkOperatorX>()._revocable_mem_size =
+                vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM + 1;
+    }
+    {
+        // Reserve failed and enable spill disk, so that the query be paused.
         read_dep->set_ready();
         
EXPECT_FALSE(_query_ctx->resource_ctx()->task_controller()->low_memory_mode());
         
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
@@ -896,9 +1078,11 @@ TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY_FAIL) {
         
EXPECT_FALSE(task->_operators.front()->cast<DummyOperator>()._low_memory_mode);
         EXPECT_FALSE(task->_sink->cast<DummySinkOperatorX>()._low_memory_mode);
         EXPECT_FALSE(task->_eos);
+        // Not enable spill disk, so that task will not be paused.
         EXPECT_TRUE(task->_spilling);
         EXPECT_FALSE(done);
         EXPECT_FALSE(task->_wake_up_early);
+        
EXPECT_TRUE(_query_ctx->resource_ctx()->task_controller()->is_enable_reserve_memory());
         EXPECT_TRUE(source_finish_dep->ready());
         EXPECT_TRUE(source_finish_dep->_blocked_task.empty());
         EXPECT_TRUE(
@@ -921,14 +1105,15 @@ TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY_FAIL) {
         EXPECT_FALSE(done);
         EXPECT_FALSE(task->_wake_up_early);
         EXPECT_TRUE(source_finish_dep->ready());
+        
EXPECT_TRUE(_query_ctx->resource_ctx()->task_controller()->is_enable_reserve_memory());
         EXPECT_TRUE(source_finish_dep->_blocked_task.empty());
         EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
         EXPECT_TRUE(
                 
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused);
     }
     {
-        // Reserve failed and paused.
         
((MockWorkloadGroupMgr*)ExecEnv::GetInstance()->_workload_group_manager)->_paused
 = false;
+        // Disable reserve memory, so that the get_reserve_mem_size == 0, so 
that reserve will always success
         task->_sink->cast<DummySinkOperatorX>()._disable_reserve_mem = true;
         EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
         bool done = false;
@@ -940,6 +1125,7 @@ TEST_F(PipelineTaskTest, TEST_RESERVE_MEMORY_FAIL) {
         EXPECT_FALSE(task->_spilling);
         EXPECT_TRUE(done);
         EXPECT_FALSE(task->_wake_up_early);
+        
EXPECT_TRUE(_query_ctx->resource_ctx()->task_controller()->is_enable_reserve_memory());
         EXPECT_TRUE(source_finish_dep->ready());
         EXPECT_TRUE(source_finish_dep->_blocked_task.empty());
         EXPECT_EQ(task->_exec_state, PipelineTask::State::RUNNABLE);
diff --git a/be/test/pipeline/thrift_builder.h 
b/be/test/pipeline/thrift_builder.h
index 1f792553e66..cf03d609f0c 100644
--- a/be/test/pipeline/thrift_builder.h
+++ b/be/test/pipeline/thrift_builder.h
@@ -96,6 +96,11 @@ public:
         return *this;
     }
 
+    TQueryOptionsBuilder& set_enable_spill(int64_t enable_spill) {
+        _query_options.__set_enable_spill(enable_spill);
+        return *this;
+    }
+
     TQueryOptions& build() { return _query_options; }
 
     TQueryOptionsBuilder(const TQueryOptionsBuilder&) = delete;
diff --git a/be/test/runtime/workload_group/workload_group_manager_test.cpp 
b/be/test/runtime/workload_group/workload_group_manager_test.cpp
index d3c589552f2..f81488928c9 100644
--- a/be/test/runtime/workload_group/workload_group_manager_test.cpp
+++ b/be/test/runtime/workload_group/workload_group_manager_test.cpp
@@ -231,8 +231,8 @@ TEST_F(WorkloadGroupManagerTest, wg_exceed3) {
 
     query_context->query_mem_tracker()->consume(-1024L * 1024 * 4);
 
-    // Query was not cancelled, because the query's limit is bigger than the 
wg's limit and the wg's policy is NONE.
-    ASSERT_FALSE(query_context->is_cancelled());
+    // In the wg's policy is NONE. If the query reserve memory failed and 
revocable memory == 0, just cancel it.
+    ASSERT_TRUE(query_context->is_cancelled());
     // Its limit == workload group's limit
     ASSERT_EQ(query_context->resource_ctx()->memory_context()->mem_limit(), 
wg->memory_limit());
 
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index aabebaa0b1b..65ae7e876b4 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -146,7 +146,7 @@ struct TQueryOptions {
   // if set, this will overwrite the BE config.
   30: optional i32 max_pushdown_conditions_per_column
   // whether enable spilling to disk
-  31: optional bool enable_spilling = false;
+  // 31: optional bool enable_spilling = false;
   // whether enable parallel merge in exchange node
   32: optional bool enable_enable_exchange_node_parallel_merge = false; // 
deprecated
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to