This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new ae974c9435b [refactor] refactor spill strategy (#41195)
ae974c9435b is described below

commit ae974c9435b7f73c1afd5dce81958e0c0b65a5d2
Author: yiguolei <[email protected]>
AuthorDate: Tue Sep 24 11:57:13 2024 +0800

    [refactor] refactor spill strategy (#41195)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
    
    Co-authored-by: yiguolei <[email protected]>
---
 be/src/common/status.h                             |   3 +
 be/src/pipeline/pipeline_task.cpp                  |  17 +-
 be/src/runtime/memory/thread_mem_tracker_mgr.h     |   6 +-
 be/src/runtime/query_context.cpp                   |   4 +
 be/src/runtime/query_context.h                     |  24 ++
 be/src/runtime/workload_group/workload_group.h     |   5 +
 .../workload_group/workload_group_manager.cpp      | 312 +++++++++------------
 .../workload_group/workload_group_manager.h        |   6 +-
 8 files changed, 184 insertions(+), 193 deletions(-)

diff --git a/be/src/common/status.h b/be/src/common/status.h
index e95b9343167..c6e2712dc9c 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -130,6 +130,9 @@ namespace ErrorCode {
     E(BAD_CAST, -254, true);                                 \
     E(ARITHMETIC_OVERFLOW_ERRROR, -255, false);              \
     E(PERMISSION_DENIED, -256, false);                       \
+    E(QUERY_MEMORY_EXCEED, -257, false);                     \
+    E(WORKLOAD_GROUP_MEMORY_EXCEED, -257, false);            \
+    E(PROCESS_MEMORY_EXCEED, -257, false);                   \
     E(CE_CMD_PARAMS_ERROR, -300, true);                      \
     E(CE_BUFFER_TOO_SMALL, -301, true);                      \
     E(CE_CMD_NOT_VALID, -302, true);                         \
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index f7e8da9f948..28ca578757d 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -386,12 +386,14 @@ Status PipelineTask::execute(bool* eos) {
             if (workload_group && reserve_size > 0) {
                 auto st = thread_context()->try_reserve_memory(reserve_size);
                 if (!st.ok()) {
-                    VLOG_DEBUG << "query: " << print_id(query_id)
-                               << ", try to reserve: " << reserve_size << 
"(sink reserve size:("
-                               << sink_reserve_size << " )"
-                               << ", sink name: " << _sink->get_name()
-                               << ", node id: " << _sink->node_id() << " 
failed: " << st.to_string()
-                               << ", debug info: " << 
GlobalMemoryArbitrator::process_mem_log_str();
+                    LOG(INFO) << "query: " << print_id(query_id)
+                              << ", try to reserve: " << reserve_size << 
"(sink reserve size:("
+                              << sink_reserve_size << " )"
+                              << ", sink name: " << _sink->get_name()
+                              << ", node id: " << _sink->node_id() << " 
failed: " << st.to_string()
+                              << ", debug info: " << 
GlobalMemoryArbitrator::process_mem_log_str();
+
+                    _state->get_query_ctx()->update_paused_reason(st);
                     _state->get_query_ctx()->set_low_memory_mode();
                     bool is_high_wartermark = false;
                     bool is_low_wartermark = false;
@@ -401,7 +403,7 @@ Status PipelineTask::execute(bool* eos) {
                     /// If the available memory for revoking is large enough, 
here trigger revoking proactively.
                     bool need_to_pause = false;
                     const auto revocable_mem_size = 
_sink->revocable_mem_size(_state);
-                    if (revocable_mem_size > 1024L * 1024 * 1024) {
+                    if (revocable_mem_size > 100L * 1024 * 1024) {
                         LOG(INFO) << "query: " << print_id(query_id)
                                   << ", task id: " << _state->task_id()
                                   << " has big memory to revoke: " << 
revocable_mem_size;
@@ -410,7 +412,6 @@ Status PipelineTask::execute(bool* eos) {
                     } else {
                         need_to_pause = is_low_wartermark || 
is_high_wartermark;
                     }
-
                     if (need_to_pause) {
                         _memory_sufficient_dependency->block();
                         
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h 
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index fd14750d8b8..01df735fff1 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -290,7 +290,7 @@ inline doris::Status 
ThreadMemTrackerMgr::try_reserve(int64_t size) {
                 "reserve memory failed, size: {}, because memory tracker 
consumption: {}, limit: "
                 "{}",
                 size, _limiter_tracker->consumption(), 
_limiter_tracker->limit());
-        return doris::Status::MemoryLimitExceeded(err_msg);
+        return doris::Status::Error<ErrorCode::QUERY_MEMORY_EXCEED>(err_msg);
     }
     auto wg_ptr = _wg_wptr.lock();
     if (wg_ptr) {
@@ -299,7 +299,7 @@ inline doris::Status 
ThreadMemTrackerMgr::try_reserve(int64_t size) {
                                        wg_ptr->memory_debug_string());
             _limiter_tracker->release(size);          // rollback
             _limiter_tracker->release_reserved(size); // rollback
-            return doris::Status::MemoryLimitExceeded(err_msg);
+            return 
doris::Status::Error<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEED>(err_msg);
         }
     }
     if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) {
@@ -310,7 +310,7 @@ inline doris::Status 
ThreadMemTrackerMgr::try_reserve(int64_t size) {
         if (wg_ptr) {
             wg_ptr->sub_wg_refresh_interval_memory_growth(size); // rollback
         }
-        return doris::Status::MemoryLimitExceeded(err_msg);
+        return doris::Status::Error<ErrorCode::PROCESS_MEMORY_EXCEED>(err_msg);
     }
     _reserved_mem += size;
     return doris::Status::OK();
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 47d750978be..2932bb708ec 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -224,6 +224,10 @@ void QueryContext::set_execution_dependency_ready() {
 
 void QueryContext::set_memory_sufficient(bool sufficient) {
     if (sufficient) {
+        {
+            std::lock_guard l(_paused_mutex);
+            _paused_reason = Status::OK();
+        }
         _memory_sufficient_dependency->set_ready();
     } else {
         _memory_sufficient_dependency->block();
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 030de735245..471f82ba807 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -321,6 +321,27 @@ public:
 
     bool low_memory_mode() { return _low_memory_mode; }
 
+    void update_paused_reason(const Status& st) {
+        std::lock_guard l(_paused_mutex);
+        if (_paused_reason.is<ErrorCode::QUERY_MEMORY_EXCEED>()) {
+            return;
+        } else if 
(_paused_reason.is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEED>()) {
+            if (st.is<ErrorCode::QUERY_MEMORY_EXCEED>()) {
+                _paused_reason = st;
+                return;
+            } else {
+                return;
+            }
+        } else {
+            _paused_reason = st;
+        }
+    }
+
+    Status paused_reason() {
+        std::lock_guard l(_paused_mutex);
+        return _paused_reason;
+    }
+
 private:
     int _timeout_second;
     TUniqueId _query_id;
@@ -377,6 +398,9 @@ private:
     // help us manage the query.
     QuerySource _query_source;
 
+    Status _paused_reason;
+    std::mutex _paused_mutex;
+
     // when fragment of pipeline is closed, it will register its profile to 
this map by using add_fragment_profile
     // flatten profile of one fragment:
     // Pipeline 0
diff --git a/be/src/runtime/workload_group/workload_group.h 
b/be/src/runtime/workload_group/workload_group.h
index b00bbfe68b5..86e5f494f77 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -235,6 +235,10 @@ public:
 
     int64_t load_buffer_limit() { return _load_buffer_limit; }
 
+    void update_memory_sufficent(bool is_sufficient) { _is_sufficient = 
is_sufficient; }
+
+    bool memory_sufficent() const { return _is_sufficient; }
+
 private:
     mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, 
_memory_limit
     const uint64_t _id;
@@ -245,6 +249,7 @@ private:
     // If the wg's memory reached high water mark, then the load buffer
     // will be restricted to this limit.
     int64_t _load_buffer_limit;
+    std::atomic<bool> _is_sufficient = true;
 
     // memory used by load memtable
     int64_t _active_mem_usage = 0;
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 39e20db4fca..b9a7989d59e 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -34,8 +34,10 @@
 
 namespace doris {
 
-PausedQuery::PausedQuery(std::shared_ptr<QueryContext> query_ctx)
-        : query_ctx_(query_ctx), query_id_(print_id(query_ctx->query_id())) {
+PausedQuery::PausedQuery(std::shared_ptr<QueryContext> query_ctx, double 
cache_ratio)
+        : query_ctx_(query_ctx),
+          cache_ratio_(cache_ratio),
+          query_id_(print_id(query_ctx->query_id())) {
     enqueue_at = std::chrono::system_clock::now();
 }
 
@@ -196,7 +198,8 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
             doris::GlobalMemoryArbitrator::sys_mem_available_details_str(),
             PrettyPrinter::print(all_workload_groups_mem_usage, TUnit::BYTES),
             weighted_memory_limit_ratio);
-    LOG_EVERY_T(INFO, 60) << debug_msg;
+    // LOG_EVERY_T(INFO, 60) << debug_msg;
+    LOG(INFO) << debug_msg;
     for (auto& wg : _workload_groups) {
         auto wg_mem_limit = wg.second->memory_limit();
         auto wg_weighted_mem_limit = int64_t(wg_mem_limit * 
weighted_memory_limit_ratio);
@@ -272,7 +275,8 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
             } else {
                 // If low water mark is not reached, then use process memory 
limit as query memory limit.
                 // It means it will not take effect.
-                if (!is_low_wartermark) {
+                // If there are some query in paused list, then limit should 
take effect.
+                if (!is_low_wartermark && wg.second->memory_sufficent()) {
                     query_weighted_mem_limit = wg_high_water_mark_limit;
                 } else {
                     query_weighted_mem_limit =
@@ -293,7 +297,10 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() {
 
             query_ctx->set_mem_limit(query_weighted_mem_limit);
         }
-        LOG_EVERY_T(INFO, 60) << debug_msg;
+        // During memory insufficent stage, we already set every query's 
memlimit, so that the flag is useless any more.
+        wg.second->update_memory_sufficent(true);
+        // LOG_EVERY_T(INFO, 60) << debug_msg;
+        LOG(INFO) << debug_msg;
     }
 }
 
@@ -325,37 +332,31 @@ void WorkloadGroupMgr::add_paused_query(const 
std::shared_ptr<QueryContext>& que
     std::lock_guard<std::mutex> lock(_paused_queries_lock);
     DCHECK(query_ctx != nullptr);
     auto wg = query_ctx->workload_group();
-    auto&& [it, inserted] = _paused_queries_list[wg].emplace(query_ctx);
+    auto&& [it, inserted] = _paused_queries_list[wg].emplace(
+            query_ctx, 
doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted);
     if (inserted) {
-        LOG(INFO) << "here insert one new paused query: " << it->query_id()
-                  << ", wg: " << (void*)(wg.get());
+        LOG(INFO) << "workload group " << wg->debug_string()
+                  << " insert one new paused query: " << it->query_id();
     }
 }
 
 /**
  * Strategy 1: A revocable query should not have any running 
task(PipelineTask).
- * strategy 2: If the workload group is below low water mark, we make all 
queries in this wg runnable.
- * strategy 3: Pick the query which has the max revocable size to revoke 
memory.
- * strategy 4: If all queries are not revocable and they all have not any 
running task,
- *             we choose the max memory usage query to cancel.
+ * strategy 2: If the workload group has any task exceed workload group 
memlimit, then set all queryctx's memlimit
+ * strategy 3: If any query exceed process memlimit, then should clear all 
caches.
+ * strategy 4: If any query exceed query's memlimit, then do spill disk or 
cancel it.
+ * strategy 5: If any query exceed process's memlimit and cache is zero, then 
do spill disk or cancel it.
  */
 void WorkloadGroupMgr::handle_paused_queries() {
     std::unique_lock<std::mutex> lock(_paused_queries_lock);
-    if (_paused_queries_list.empty()) {
-        if 
(doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted 
!= 1) {
-            
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted = 
1;
-            doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity();
-            LOG(INFO) << "There are no queries in paused list, so that set 
cache capacity to 1 now";
-        }
-        return;
-    }
-
     for (auto it = _paused_queries_list.begin(); it != 
_paused_queries_list.end();) {
         auto& queries_list = it->second;
         const auto& wg = it->first;
         if (queries_list.empty()) {
-            LOG(INFO) << "wg: " << wg->debug_string() << " has no paused 
query";
+            LOG(INFO) << "wg: " << wg->debug_string()
+                      << " has no paused query, update it to memory sufficent";
             it = _paused_queries_list.erase(it);
+            wg->update_memory_sufficent(true);
             continue;
         }
 
@@ -364,184 +365,133 @@ void WorkloadGroupMgr::handle_paused_queries() {
 
         wg->check_mem_used(&is_low_wartermark, &is_high_wartermark);
 
-        if (!is_low_wartermark && !is_high_wartermark) {
-            // TODO: should check if there is a large reserve size in the 
query's operators
-            // If it exist, then should find the query and spill it.
-            LOG(INFO) << "**** there are " << queries_list.size() << " to 
resume";
-            for (const auto& query : queries_list) {
-                LOG(INFO) << "**** resume paused query: " << query.query_id();
-                auto query_ctx = query.query_ctx_.lock();
-                if (query_ctx != nullptr) {
-                    query_ctx->set_memory_sufficient(true);
-                }
-            }
-
-            queries_list.clear();
-            it = _paused_queries_list.erase(it);
-            continue;
-        } else {
-            ++it;
-        }
-
-        // If the wg's query list is empty, then should do nothing
-        if (queries_list.empty()) {
-            continue;
-        }
-
-        // TODO: should check buffer type memory first, if could release many 
these memory, then not need do spill disk
-        // Buffer Memory are:
-        // 1. caches: page cache, segment cache...
-        // 2. memtables: load memtable
-        // 3. scan queue, exchange sink buffer, union queue
-        // 4. streaming aggs.
-        // If we could not recycle memory from these buffers(< 10%), then do 
spill disk.
-
-        // 1. Check cache used, if cache is larger than > 0, then just return 
and wait for it to 0 to release some memory.
-        if 
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted > 
0 &&
-            
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted > 
0) {
-            
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted = 
0;
-            doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity();
-            LOG(INFO) << "There are some queries need memory, so that set 
cache capacity to 0 now";
-            // If there is cache, then return, only check to do spill disk 
when cache is larger than 0.
-            return;
-        }
-
-        // 2. If memtable size larger than 10% of wg's limit, then flush 
memtable and wait.
-        MemTableMemoryLimiter* memtable_limiter =
-                doris::ExecEnv::GetInstance()->memtable_memory_limiter();
-        // Not use memlimit, should use high water mark.
-        int64_t memtable_active_bytes = 0;
-        int64_t memtable_write_bytes = 0;
-        int64_t memtable_flush_bytes = 0;
-        wg->get_load_mem_usage(&memtable_active_bytes, &memtable_write_bytes,
-                               &memtable_flush_bytes);
-        // TODO: should add a signal in memtable limiter to prevent new batch
-        // For example, streamload, it will not reserve many memory, but it 
will occupy many memtable memory.
-        // TODO: 0.2 should be a workload group properties. For example, the 
group is optimized for load,then the value
-        // should be larged, if the group is optimized for query, then the 
value should be smaller.
-        int64_t max_wg_memtable_bytes = wg->load_buffer_limit();
-        if (memtable_active_bytes + memtable_write_bytes + 
memtable_flush_bytes >
-            max_wg_memtable_bytes) {
-            // There are many table in flush queue, just waiting them flush 
finished.
-            if (memtable_active_bytes < (int64_t)(max_wg_memtable_bytes * 
0.8)) {
-                LOG_EVERY_T(INFO, 60)
-                        << wg->name() << " load memtable size is: " << 
memtable_active_bytes << ", "
-                        << memtable_write_bytes << ", " << memtable_flush_bytes
-                        << ", wait for flush finished to release more memory";
-                continue;
-            } else {
-                // Flush some memtables(currently written) to flush queue.
-                memtable_limiter->flush_workload_group_memtables(
-                        wg->id(), memtable_active_bytes - 
(int64_t)(max_wg_memtable_bytes * 0.8));
-                LOG_EVERY_T(INFO, 60)
-                        << wg->name() << " load memtable size is: " << 
memtable_active_bytes << ", "
-                        << memtable_write_bytes << ", " << memtable_flush_bytes
-                        << ", flush half of active memtable to revoke memory";
-                continue;
-            }
-        }
-
-        // If the wg enable memory overcommit, then not spill, just cancel 
query.
-        // if (wg->enable_memory_overcommit()) {
-        // TODO  should cancel top query here.
-        //    continue;
-        //}
-
-        std::shared_ptr<QueryContext> max_revocable_query;
-        std::shared_ptr<QueryContext> max_memory_usage_query;
-        std::shared_ptr<QueryContext> running_query;
-        bool has_running_query = false;
-        size_t max_revocable_size = 0;
-        size_t max_memory_usage = 0;
-        auto it_to_remove = queries_list.end();
-
+        // If the query is paused because its limit exceed the query itself's 
memlimit, then just spill disk.
+        // The query's memlimit is set using slot mechanism and its value is 
set using the user settings, not
+        // by weighted value. So if reserve failed, then it is actually exceed 
limit.
         for (auto query_it = queries_list.begin(); query_it != 
queries_list.end();) {
-            const auto query_ctx = query_it->query_ctx_.lock();
+            auto query_ctx = query_it->query_ctx_.lock();
             // The query is finished during in paused list.
             if (query_ctx == nullptr) {
                 query_it = queries_list.erase(query_it);
                 continue;
             }
-            size_t revocable_size = 0;
-            size_t memory_usage = 0;
-            bool has_running_task = false;
-
             if (query_ctx->is_cancelled()) {
                 LOG(INFO) << "query: " << print_id(query_ctx->query_id())
                           << "was canceled, remove from paused list";
                 query_it = queries_list.erase(query_it);
                 continue;
             }
-
-            query_ctx->get_revocable_info(&revocable_size, &memory_usage, 
&has_running_task);
-            if (has_running_task) {
-                has_running_query = true;
-                running_query = query_ctx;
-                break;
-            } else if (revocable_size > max_revocable_size) {
-                max_revocable_query = query_ctx;
-                max_revocable_size = revocable_size;
-                it_to_remove = query_it;
-            } else if (memory_usage > max_memory_usage) {
-                max_memory_usage_query = query_ctx;
-                max_memory_usage = memory_usage;
-                it_to_remove = query_it;
-            }
-
-            ++query_it;
-        }
-
-        if (has_running_query) {
-            LOG(INFO) << "has running task, query: " << 
print_id(running_query->query_id());
-        } else if (max_revocable_query) {
-            queries_list.erase(it_to_remove);
-            queries_list.insert(queries_list.begin(), max_revocable_query);
-
-            auto revocable_tasks = max_revocable_query->get_revocable_tasks();
-            DCHECK(!revocable_tasks.empty());
-
-            LOG(INFO) << "query: " << 
print_id(max_revocable_query->query_id()) << ", has "
-                      << revocable_tasks.size()
-                      << " tasks to revoke memory, max revocable size: " << 
max_revocable_size;
-            SCOPED_ATTACH_TASK(max_revocable_query.get());
-            for (auto* task : revocable_tasks) {
-                auto st = task->revoke_memory();
-                if (!st.ok()) {
-                    max_revocable_query->cancel(st);
-                    break;
+            if 
(query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEED>()) {
+                bool spill_res = spill_or_cancel_query(query_ctx, 
query_ctx->paused_reason());
+                if (!spill_res) {
+                    ++query_it;
+                    continue;
+                } else {
+                    query_it = queries_list.erase(query_it);
+                    continue;
                 }
-            }
-        } else if (max_memory_usage_query) {
-            bool new_is_low_wartermark = false;
-            bool new_is_high_wartermark = false;
-            const auto query_id = print_id(max_memory_usage_query->query_id());
-            wg->check_mem_used(&new_is_low_wartermark, 
&new_is_high_wartermark);
-            if (new_is_high_wartermark) {
-                if (it_to_remove->elapsed_time() < 2000) {
-                    LOG(INFO) << "memory insufficient and cannot find 
revocable query, "
-                                 "the max usage query: "
-                              << query_id << ", usage: " << max_memory_usage
-                              << ", elapsed: " << it_to_remove->elapsed_time()
-                              << ", wg info: " << wg->debug_string();
+            } else if 
(query_ctx->paused_reason().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEED>()) {
+                if (wg->memory_sufficent()) {
+                    wg->update_memory_sufficent(false);
+                    LOG(INFO) << "query: " << print_id(query_ctx->query_id())
+                              << " reserve memory failed due to workload group 
memory exceed, "
+                                 "should set the workload group work in memory 
insufficent mode, "
+                                 "so that other query will reduce their 
memory. wg: "
+                              << wg->debug_string();
+                }
+                // Should not put the query back to task scheduler 
immediately, because when wg's memory not sufficient,
+                // and then set wg's flag, other query may not free memory 
very quickly.
+                if (query_it->elapsed_time() > 1000) {
+                    // set wg's memory to insufficent, then add it back to 
task scheduler to run.
+                    query_ctx->set_memory_sufficient(true);
+                    query_it = queries_list.erase(query_it);
+                } else {
+                    ++query_it;
+                }
+                continue;
+            } else {
+                // If wg's memlimit not exceed, but process memory exceed, it 
means cache or other metadata
+                // used too much memory. Should clean all cache here.
+                // 1. Check cache used, if cache is larger than > 0, then just 
return and wait for it to 0 to release some memory.
+                if 
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted >
+                            0.001 &&
+                    
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted >
+                            0.001) {
+                    
doris::GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted =
+                            0;
+                    
doris::GlobalMemoryArbitrator::notify_cache_adjust_capacity();
+                    LOG(INFO) << "There are some queries need process memory, 
so that set cache "
+                                 "capacity "
+                                 "to 0 now";
+                }
+                if (query_it->cache_ratio_ < 0.001) {
+                    bool spill_res = spill_or_cancel_query(query_ctx, 
query_ctx->paused_reason());
+                    if (!spill_res) {
+                        ++query_it;
+                        continue;
+                    } else {
+                        query_it = queries_list.erase(query_it);
+                        continue;
+                    }
+                }
+                if 
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted <
+                            0.001 &&
+                    query_it->cache_ratio_ > 0.001) {
+                    query_ctx->set_memory_sufficient(true);
+                    query_it = queries_list.erase(query_it);
                     continue;
                 }
-                max_memory_usage_query->cancel(Status::InternalError(
-                        "memory insufficient and cannot find revocable query, 
cancel "
-                        "the "
-                        "biggest usage({}) query({})",
-                        max_memory_usage, query_id));
-                queries_list.erase(it_to_remove);
+                ++query_it;
+            }
+        }
 
-            } else {
-                LOG(INFO) << "non high water mark, resume "
-                             "the query: "
-                          << query_id << ", usage: " << max_memory_usage
-                          << ", wg info: " << wg->debug_string();
-                max_memory_usage_query->set_memory_sufficient(true);
-                queries_list.erase(it_to_remove);
+        // Finished deal with one workload group, and should deal with next 
one.
+        ++it;
+    }
+}
+
+bool WorkloadGroupMgr::spill_or_cancel_query(std::shared_ptr<QueryContext> 
query_ctx,
+                                             Status paused_reason) {
+    // TODO: If the query is an insert into select query, should consider 
memtable as revoke memory.
+    size_t revocable_size = 0;
+    size_t memory_usage = 0;
+    bool has_running_task = false;
+    query_ctx->get_revocable_info(&revocable_size, &memory_usage, 
&has_running_task);
+    if (has_running_task) {
+        LOG(INFO) << "query: " << print_id(query_ctx->query_id())
+                  << "is paused, but still has running task, skip it.";
+        return false;
+    }
+
+    auto revocable_tasks = query_ctx->get_revocable_tasks();
+    if (revocable_tasks.empty()) {
+        if (paused_reason.is<ErrorCode::QUERY_MEMORY_EXCEED>()) {
+            // Use MEM_LIMIT_EXCEEDED so that FE could parse the error code 
and do try logic
+            
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(
+                    "query reserve memory failed, but could not find  memory 
that "
+                    "could "
+                    "release or spill to disk"));
+        } else {
+            
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(
+                    "The query reserved memory failed because process limit 
exceeded, and "
+                    "there is no cache now. And could not find task to spill. 
Maybe you should set "
+                    "the workload group's limit to a lower value."));
+        }
+    } else {
+        SCOPED_ATTACH_TASK(query_ctx.get());
+        // TODO, should spill the task that has max memory, not all
+        for (auto* task : revocable_tasks) {
+            auto st = task->revoke_memory();
+            if (!st.ok()) {
+                query_ctx->cancel(st);
+                break;
             }
         }
+        LOG(INFO) << "query: " << print_id(query_ctx->query_id()) << ", has "
+                  << revocable_tasks.size()
+                  << " tasks to revoke memory, revocable size: " << 
revocable_size;
     }
+    return true;
 }
 
 void WorkloadGroupMgr::stop() {
diff --git a/be/src/runtime/workload_group/workload_group_manager.h 
b/be/src/runtime/workload_group/workload_group_manager.h
index 6e51a1f3767..f84bf3a29ff 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -44,8 +44,9 @@ public:
     std::weak_ptr<QueryContext> query_ctx_;
     std::chrono::system_clock::time_point enqueue_at;
     size_t last_mem_usage {0};
+    double cache_ratio_ {0.0};
 
-    PausedQuery(std::shared_ptr<QueryContext> query_ctx);
+    PausedQuery(std::shared_ptr<QueryContext> query_ctx, double cache_ratio);
 
     int64_t elapsed_time() const {
         auto now = std::chrono::system_clock::now();
@@ -101,6 +102,9 @@ public:
 
     void update_load_memtable_usage(const std::map<uint64_t, MemtableUsage>& 
wg_memtable_usages);
 
+private:
+    bool spill_or_cancel_query(std::shared_ptr<QueryContext> query_ctx, Status 
paused_reason);
+
 private:
     std::shared_mutex _group_mutex;
     std::unordered_map<uint64_t, WorkloadGroupPtr> _workload_groups;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to