This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 7f8a9c4c9c8 support stream load (#41351)
7f8a9c4c9c8 is described below

commit 7f8a9c4c9c80ecb443f46ed7876025767eb4d087
Author: yiguolei <[email protected]>
AuthorDate: Thu Sep 26 16:44:55 2024 +0800

    support stream load (#41351)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
    
    ---------
    
    Co-authored-by: yiguolei <[email protected]>
---
 be/src/common/config.cpp                           |   2 +-
 be/src/pipeline/pipeline_task.cpp                  |   6 +-
 be/src/runtime/memory/global_memory_arbitrator.cpp |   1 +
 be/src/runtime/memory/global_memory_arbitrator.h   |   1 +
 be/src/runtime/memory/thread_mem_tracker_mgr.h     |  31 +-
 be/src/runtime/query_context.cpp                   |   5 +
 be/src/runtime/query_context.h                     |   7 +
 be/src/runtime/workload_group/workload_group.cpp   |  22 ++
 be/src/runtime/workload_group/workload_group.h     |  38 +--
 .../workload_group/workload_group_manager.cpp      | 355 ++++++++++++++-------
 .../workload_group/workload_group_manager.h        |  12 +-
 11 files changed, 317 insertions(+), 163 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 78e68c9de64..7e9ff8d8801 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -96,7 +96,7 @@ DEFINE_String(mem_limit, "90%");
 DEFINE_Double(soft_mem_limit_frac, "0.9");
 
 // Cache capacity reduce mem limit as a fraction of soft mem limit.
-DEFINE_mDouble(cache_capacity_reduce_mem_limit_frac, "0.6");
+DEFINE_mDouble(cache_capacity_reduce_mem_limit_frac, "0.7");
 
 // Schema change memory limit as a fraction of soft memory limit.
 DEFINE_Double(schema_change_mem_limit_frac, "0.6");
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 4a1e4536373..0028614b22a 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -367,7 +367,7 @@ Status PipelineTask::execute(bool* eos) {
 
         // `_dry_run` means sink operator need no more data
         // `_sink->is_finished(_state)` means sink operator should be finished
-        size_t reserve_size = 0;
+        int64_t reserve_size = 0;
         bool has_enough_memory = true;
         if (_dry_run || _sink->is_finished(_state)) {
             *eos = true;
@@ -401,7 +401,7 @@ Status PipelineTask::execute(bool* eos) {
                     if (is_low_wartermark || is_high_wartermark) {
                         _memory_sufficient_dependency->block();
                         
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
-                                _state->get_query_ctx()->shared_from_this());
+                                _state->get_query_ctx()->shared_from_this(), 
reserve_size);
                         continue;
                     }
                     has_enough_memory = false;
@@ -439,7 +439,7 @@ Status PipelineTask::execute(bool* eos) {
                       << ", insufficient memory. reserve_size: " << 
reserve_size;
             _memory_sufficient_dependency->block();
             ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
-                    _state->get_query_ctx()->shared_from_this());
+                    _state->get_query_ctx()->shared_from_this(), reserve_size);
             break;
         }
     }
diff --git a/be/src/runtime/memory/global_memory_arbitrator.cpp 
b/be/src/runtime/memory/global_memory_arbitrator.cpp
index 0c774187ff3..2b649efba50 100644
--- a/be/src/runtime/memory/global_memory_arbitrator.cpp
+++ b/be/src/runtime/memory/global_memory_arbitrator.cpp
@@ -44,6 +44,7 @@ std::atomic<double> 
GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted
 std::atomic<double> 
GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted {1};
 // The value that take affect
 std::atomic<double> 
GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted {1};
+std::atomic<bool> GlobalMemoryArbitrator::any_workload_group_exceed_limit 
{false};
 std::mutex GlobalMemoryArbitrator::memtable_memory_refresh_lock;
 std::condition_variable GlobalMemoryArbitrator::memtable_memory_refresh_cv;
 std::atomic<bool> GlobalMemoryArbitrator::memtable_memory_refresh_notify 
{false};
diff --git a/be/src/runtime/memory/global_memory_arbitrator.h 
b/be/src/runtime/memory/global_memory_arbitrator.h
index 468d442b662..e2b55c8aa98 100644
--- a/be/src/runtime/memory/global_memory_arbitrator.h
+++ b/be/src/runtime/memory/global_memory_arbitrator.h
@@ -182,6 +182,7 @@ public:
     static std::atomic<double> last_wg_trigger_cache_capacity_adjust_weighted;
     // The value that take affect
     static std::atomic<double> last_affected_cache_capacity_adjust_weighted;
+    static std::atomic<bool> any_workload_group_exceed_limit;
 
     static void notify_cache_adjust_capacity() {
         cache_adjust_capacity_notify.store(true, std::memory_order_relaxed);
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h 
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 15a57528491..c9f85258d5b 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -286,18 +286,27 @@ inline doris::Status 
ThreadMemTrackerMgr::try_reserve(int64_t size) {
     // _untracked_mem store bytes that not synchronized to process reserved 
memory.
     flush_untracked_mem();
     auto wg_ptr = _wg_wptr.lock();
-    // If wg not exist or wg enable overcommit, then query's memlimit is not 
considered.
-    if (wg_ptr != nullptr && !wg_ptr->enable_memory_overcommit()) {
-        if (!_limiter_tracker->try_reserve(size)) {
-            auto err_msg = fmt::format(
-                    "reserve memory failed, size: {}, because memory tracker 
consumption: {}, "
-                    "limit: "
-                    "{}",
-                    size, _limiter_tracker->consumption(), 
_limiter_tracker->limit());
-            return 
doris::Status::Error<ErrorCode::QUERY_MEMORY_EXCEEDED>(err_msg);
+    // For wg with overcommit, the limit will only task affect when memory > 
soft limit
+    // wg mgr will change wg's hard limit property.
+    if (wg_ptr != nullptr && wg_ptr->enable_memory_overcommit() &&
+        !wg_ptr->has_changed_to_hard_limit()) {
+        // TODO: Only do a check here, do not real reserve. If we could 
reserve it, it is better, but the logic is too complicated.
+        if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) {
+            return doris::Status::Error<ErrorCode::PROCESS_MEMORY_EXCEEDED>(
+                    "reserve memory failed, size: {}, because {}", size,
+                    GlobalMemoryArbitrator::process_mem_log_str());
+        } else {
+            
doris::GlobalMemoryArbitrator::release_process_reserved_memory(size);
+            return Status::OK();
         }
-    } else {
-        _limiter_tracker->reserve(size);
+    }
+    if (!_limiter_tracker->try_reserve(size)) {
+        auto err_msg = fmt::format(
+                "reserve memory failed, size: {}, because memory tracker 
consumption: {}, "
+                "limit: "
+                "{}",
+                size, _limiter_tracker->consumption(), 
_limiter_tracker->limit());
+        return doris::Status::Error<ErrorCode::QUERY_MEMORY_EXCEEDED>(err_msg);
     }
     if (wg_ptr) {
         if (!wg_ptr->add_wg_refresh_interval_memory_growth(size)) {
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 45fd5562a93..7227b5704d8 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -129,6 +129,11 @@ void QueryContext::_init_query_mem_tracker() {
                     << " OR is -1. Using process memory limit instead.";
         bytes_limit = MemInfo::mem_limit();
     }
+    // If the query is a pure load task(streamload, routine load, group 
commit), then it should not use
+    // memlimit per query to limit their memory usage.
+    if (is_pure_load_task()) {
+        bytes_limit = MemInfo::mem_limit();
+    }
     if (_query_options.query_type == TQueryType::SELECT) {
         query_mem_tracker = MemTrackerLimiter::create_shared(
                 MemTrackerLimiter::Type::QUERY, fmt::format("Query#Id={}", 
print_id(_query_id)),
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index b74b835af63..c71b7d4f85a 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -303,6 +303,7 @@ public:
             bool is_low_wartermark = false;
             bool is_high_wartermark = false;
             _workload_group->check_mem_used(&is_low_wartermark, 
&is_high_wartermark);
+            // If the wg is not enable hard limit, this will also take effect 
to lower down the memory usage.
             if (is_high_wartermark) {
                 LOG(INFO)
                         << "Query " << print_id(_query_id)
@@ -349,6 +350,12 @@ public:
         return _paused_reason;
     }
 
+    bool is_pure_load_task() {
+        return _query_source == QuerySource::STREAM_LOAD ||
+               _query_source == QuerySource::ROUTINE_LOAD ||
+               _query_source == QuerySource::GROUP_COMMIT_LOAD;
+    }
+
 private:
     int _timeout_second;
     TUniqueId _query_id;
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index e0438a9d729..bca2f8401de 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -114,6 +114,28 @@ std::string WorkloadGroup::debug_string() const {
             _remote_scan_bytes_per_second);
 }
 
+bool WorkloadGroup::add_wg_refresh_interval_memory_growth(int64_t size) {
+    // If a group is enable memory overcommit, then not need check the limit
+    // It is always true, and it will only fail when process memory is not
+    // enough.
+    if (_enable_memory_overcommit) {
+        if (doris::GlobalMemoryArbitrator::is_exceed_soft_mem_limit(size)) {
+            return false;
+        } else {
+            return true;
+        }
+    }
+    auto realtime_total_mem_used =
+            _total_mem_used + _wg_refresh_interval_memory_growth.load() + size;
+    if ((realtime_total_mem_used >
+         ((double)_memory_limit * 
_spill_high_watermark.load(std::memory_order_relaxed) / 100))) {
+        return false;
+    } else {
+        _wg_refresh_interval_memory_growth.fetch_add(size);
+        return true;
+    }
+}
+
 std::string WorkloadGroup::memory_debug_string() const {
     auto realtime_total_mem_used = _total_mem_used + 
_wg_refresh_interval_memory_growth.load();
     auto mem_used_ratio = realtime_total_mem_used / 
((double)_weighted_memory_limit + 1);
diff --git a/be/src/runtime/workload_group/workload_group.h 
b/be/src/runtime/workload_group/workload_group.h
index 1d640afee1d..094258caee5 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -76,9 +76,9 @@ public:
     int64_t memory_limit() const {
         std::shared_lock<std::shared_mutex> r_lock(_mutex);
         return _memory_limit;
-    };
+    }
 
-    int64_t weighted_memory_limit() const { return _weighted_memory_limit; };
+    int64_t total_mem_used() const { return _total_mem_used; }
 
     void set_weighted_memory_limit(int64_t weighted_memory_limit) {
         _weighted_memory_limit = weighted_memory_limit;
@@ -103,24 +103,7 @@ public:
         return _total_query_slot_count.load(std::memory_order_relaxed);
     }
 
-    bool add_wg_refresh_interval_memory_growth(int64_t size) {
-        // If a group is enable memory overcommit, then not need check the 
limit
-        // It is always true, and it will only fail when process memory is not
-        // enough.
-        if (_enable_memory_overcommit) {
-            return true;
-        }
-        auto realtime_total_mem_used =
-                _total_mem_used + _wg_refresh_interval_memory_growth.load() + 
size;
-        if ((realtime_total_mem_used >
-             ((double)_weighted_memory_limit *
-              _spill_high_watermark.load(std::memory_order_relaxed) / 100))) {
-            return false;
-        } else {
-            _wg_refresh_interval_memory_growth.fetch_add(size);
-            return true;
-        }
-    }
+    bool add_wg_refresh_interval_memory_growth(int64_t size);
 
     void sub_wg_refresh_interval_memory_growth(int64_t size) {
         _wg_refresh_interval_memory_growth.fetch_sub(size);
@@ -129,10 +112,10 @@ public:
     void check_mem_used(bool* is_low_wartermark, bool* is_high_wartermark) 
const {
         auto realtime_total_mem_used = _total_mem_used + 
_wg_refresh_interval_memory_growth.load();
         *is_low_wartermark = (realtime_total_mem_used >
-                              ((double)_weighted_memory_limit *
+                              ((double)_memory_limit *
                                
_spill_low_watermark.load(std::memory_order_relaxed) / 100));
         *is_high_wartermark = (realtime_total_mem_used >
-                               ((double)_weighted_memory_limit *
+                               ((double)_memory_limit *
                                 
_spill_high_watermark.load(std::memory_order_relaxed) / 100));
     }
 
@@ -166,6 +149,11 @@ public:
         return _memory_limit > 0;
     }
 
+    bool exceed_limit() {
+        std::shared_lock<std::shared_mutex> r_lock(_mutex);
+        return _memory_limit > 0 ? _total_mem_used > _memory_limit : false;
+    }
+
     Status add_query(TUniqueId query_id, std::shared_ptr<QueryContext> 
query_ctx) {
         std::unique_lock<std::shared_mutex> wlock(_mutex);
         if (_is_shutdown) {
@@ -236,9 +224,9 @@ public:
 
     int64_t load_buffer_limit() { return _load_buffer_limit; }
 
-    void update_memory_sufficent(bool is_sufficient) { _is_sufficient = 
is_sufficient; }
+    bool has_changed_to_hard_limit() const { return _has_changed_hard_limit; }
 
-    bool memory_sufficent() const { return _is_sufficient; }
+    void change_to_hard_limit(bool to_hard_limit) { _has_changed_hard_limit = 
to_hard_limit; }
 
 private:
     mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, 
_memory_limit
@@ -250,7 +238,7 @@ private:
     // If the wg's memory reached high water mark, then the load buffer
     // will be restricted to this limit.
     int64_t _load_buffer_limit;
-    std::atomic<bool> _is_sufficient = true;
+    std::atomic<bool> _has_changed_hard_limit = false;
 
     // memory used by load memtable
     int64_t _active_mem_usage = 0;
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 92235c1ded7..e190499ec83 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -38,9 +38,12 @@
 
 namespace doris {
 
-PausedQuery::PausedQuery(std::shared_ptr<QueryContext> query_ctx, double 
cache_ratio)
+PausedQuery::PausedQuery(std::shared_ptr<QueryContext> query_ctx, double 
cache_ratio,
+                         bool any_wg_exceed_limit, int64_t reserve_size)
         : query_ctx_(query_ctx),
           cache_ratio_(cache_ratio),
+          any_wg_exceed_limit_(any_wg_exceed_limit),
+          reserve_size_(reserve_size),
           query_id_(print_id(query_ctx->query_id())) {
     enqueue_at = std::chrono::system_clock::now();
 }
@@ -159,11 +162,16 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() 
{
     // and calculate total memory used of all queries.
     int64_t all_workload_groups_mem_usage = 0;
     std::unordered_map<uint64_t, WorkloadGroupMemInfo> wgs_mem_info;
+    bool has_wg_exceed_limit = false;
     for (auto& [wg_id, wg] : _workload_groups) {
         wgs_mem_info[wg_id].total_mem_used =
                 
wg->make_memory_tracker_snapshots(&wgs_mem_info[wg_id].tracker_snapshots);
         all_workload_groups_mem_usage += wgs_mem_info[wg_id].total_mem_used;
+        if (wg->exceed_limit()) {
+            has_wg_exceed_limit = true;
+        }
     }
+    doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit = 
has_wg_exceed_limit;
     if (all_workload_groups_mem_usage <= 0) {
         return;
     }
@@ -204,105 +212,7 @@ void WorkloadGroupMgr::refresh_wg_weighted_memory_limit() 
{
             weighted_memory_limit_ratio);
     LOG_EVERY_T(INFO, 60) << debug_msg;
     for (auto& wg : _workload_groups) {
-        auto wg_mem_limit = wg.second->memory_limit();
-        auto wg_weighted_mem_limit = int64_t(wg_mem_limit * 
weighted_memory_limit_ratio);
-        wg.second->set_weighted_memory_limit(wg_weighted_mem_limit);
-        auto all_query_ctxs = wg.second->queries();
-        bool is_low_wartermark = false;
-        bool is_high_wartermark = false;
-        wg.second->check_mem_used(&is_low_wartermark, &is_high_wartermark);
-        int64_t wg_high_water_mark_limit =
-                (int64_t)(wg_mem_limit * 
wg.second->spill_threshold_high_water_mark() * 1.0 / 100);
-        int64_t weighted_high_water_mark_limit =
-                (int64_t)(wg_weighted_mem_limit * 
wg.second->spill_threshold_high_water_mark() *
-                          1.0 / 100);
-        std::string debug_msg;
-        if (is_high_wartermark || is_low_wartermark) {
-            debug_msg = fmt::format(
-                    "\nWorkload Group {}: mem limit: {}, mem used: {}, 
weighted mem limit: {}, "
-                    "high water mark mem limit: {}, used ratio: {}",
-                    wg.second->name(),
-                    PrettyPrinter::print(wg.second->memory_limit(), 
TUnit::BYTES),
-                    
PrettyPrinter::print(wgs_mem_info[wg.first].total_mem_used, TUnit::BYTES),
-                    PrettyPrinter::print(wg_weighted_mem_limit, TUnit::BYTES),
-                    PrettyPrinter::print(weighted_high_water_mark_limit, 
TUnit::BYTES),
-                    (double)wgs_mem_info[wg.first].total_mem_used / 
wg_weighted_mem_limit);
-
-            debug_msg += "\n  Query Memory Summary:";
-            // check whether queries need to revoke memory for task group
-            for (const auto& query_mem_tracker : 
wgs_mem_info[wg.first].tracker_snapshots) {
-                debug_msg += fmt::format(
-                        "\n    MemTracker Label={}, Used={}, MemLimit={}, "
-                        "Peak={}",
-                        query_mem_tracker->label(),
-                        PrettyPrinter::print(query_mem_tracker->consumption(), 
TUnit::BYTES),
-                        PrettyPrinter::print(query_mem_tracker->limit(), 
TUnit::BYTES),
-                        
PrettyPrinter::print(query_mem_tracker->peak_consumption(), TUnit::BYTES));
-            }
-        }
-
-        // If the wg enable over commit memory, then it is no need to update 
query memlimit
-        if (wg.second->enable_memory_overcommit()) {
-            continue;
-        }
-        int32_t total_used_slot_count = 0;
-        int32_t total_slot_count = wg.second->total_query_slot_count();
-        // calculate total used slot count
-        for (const auto& query : all_query_ctxs) {
-            auto query_ctx = query.second.lock();
-            if (!query_ctx) {
-                continue;
-            }
-            total_used_slot_count += query_ctx->get_slot_count();
-        }
-        // calculate per query weighted memory limit
-        debug_msg = "Query Memory Summary:";
-        for (const auto& query : all_query_ctxs) {
-            auto query_ctx = query.second.lock();
-            if (!query_ctx) {
-                continue;
-            }
-            int64_t query_weighted_mem_limit = 0;
-            // If the query enable hard limit, then it should not use the soft 
limit
-            if (query_ctx->enable_query_slot_hard_limit()) {
-                if (total_slot_count < 1) {
-                    LOG(WARNING)
-                            << "query " << print_id(query_ctx->query_id())
-                            << " enabled hard limit, but the slot count < 1, 
could not take affect";
-                } else {
-                    // If the query enable hard limit, then not use weighted 
info any more, just use the settings limit.
-                    query_weighted_mem_limit = 
(int64_t)((wg_high_water_mark_limit *
-                                                          
query_ctx->get_slot_count() * 1.0) /
-                                                         total_slot_count);
-                }
-            } else {
-                // If low water mark is not reached, then use process memory 
limit as query memory limit.
-                // It means it will not take effect.
-                // If there are some query in paused list, then limit should 
take effect.
-                if (!is_low_wartermark && wg.second->memory_sufficent()) {
-                    query_weighted_mem_limit = wg_high_water_mark_limit;
-                } else {
-                    query_weighted_mem_limit =
-                            total_used_slot_count > 0
-                                    ? (int64_t)((wg_high_water_mark_limit + 
total_used_slot_count) *
-                                                query_ctx->get_slot_count() * 
1.0 /
-                                                total_used_slot_count)
-                                    : wg_high_water_mark_limit;
-                }
-            }
-            debug_msg += fmt::format(
-                    "\n    MemTracker Label={}, Used={}, Limit={}, Peak={}",
-                    query_ctx->get_mem_tracker()->label(),
-                    
PrettyPrinter::print(query_ctx->get_mem_tracker()->consumption(), TUnit::BYTES),
-                    PrettyPrinter::print(query_weighted_mem_limit, 
TUnit::BYTES),
-                    
PrettyPrinter::print(query_ctx->get_mem_tracker()->peak_consumption(),
-                                         TUnit::BYTES));
-
-            query_ctx->set_mem_limit(query_weighted_mem_limit);
-        }
-        // During memory insufficent stage, we already set every query's 
memlimit, so that the flag is useless any more.
-        wg.second->update_memory_sufficent(true);
-        LOG_EVERY_T(INFO, 60) << debug_msg;
+        change_query_to_hard_limit(wg.second, false);
     }
 }
 
@@ -330,26 +240,44 @@ void 
WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) {
     }
 }
 
-void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& 
query_ctx) {
+void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>& 
query_ctx,
+                                        int64_t reserve_size) {
     std::lock_guard<std::mutex> lock(_paused_queries_lock);
     DCHECK(query_ctx != nullptr);
     auto wg = query_ctx->workload_group();
     auto&& [it, inserted] = _paused_queries_list[wg].emplace(
-            query_ctx, 
doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted);
+            query_ctx, 
doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted,
+            doris::GlobalMemoryArbitrator::any_workload_group_exceed_limit, 
reserve_size);
+    // Check if this is an invalid reserve, for example, if the reserve size 
is too large, larger than the query limit
+    // if hard limit is enabled, then not need enable other queries hard limit.
     if (inserted) {
         LOG(INFO) << "workload group " << wg->debug_string()
                   << " insert one new paused query: " << it->query_id();
     }
 }
 
+/**
+ * 1. When Process's memory is lower than soft limit, then all workload group 
will be converted to hard limit (Exception: there is only one workload group).
+ * 2. Reserve logic for workload group that is soft limit take no effect, it 
will always return success.
+ * 3. QueryLimit for streamload,routineload,group commit, take no affect, it 
will always return success, but workload group's hard limit will take affect.
+ * 4. See handle_non_overcommit_wg_paused_queries for hard limit logic.
+ */
+void WorkloadGroupMgr::handle_paused_queries() {
+    handle_non_overcommit_wg_paused_queries();
+    handle_overcommit_wg_paused_queries();
+}
+
 /**
  * Strategy 1: A revocable query should not have any running 
task(PipelineTask).
  * strategy 2: If the workload group has any task exceed workload group 
memlimit, then set all queryctx's memlimit
  * strategy 3: If any query exceed process memlimit, then should clear all 
caches.
  * strategy 4: If any query exceed query's memlimit, then do spill disk or 
cancel it.
- * strategy 5: If any query exceed process's memlimit and cache is zero, then 
do spill disk or cancel it.
+ * strategy 5: If any query exceed process's memlimit and cache is zero, then 
do following:
+ * 1. cancel other wg's(soft limit) query that exceed limit
+ * 2. spill disk
+ * 3. cancel it self.
  */
-void WorkloadGroupMgr::handle_paused_queries() {
+void WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
     const int64_t TIMEOUT_IN_QUEUE = 1000L * 10;
     std::unique_lock<std::mutex> lock(_paused_queries_lock);
     for (auto it = _paused_queries_list.begin(); it != 
_paused_queries_list.end();) {
@@ -358,16 +286,14 @@ void WorkloadGroupMgr::handle_paused_queries() {
         if (queries_list.empty()) {
             LOG(INFO) << "wg: " << wg->debug_string()
                       << " has no paused query, update it to memory sufficent";
-            wg->update_memory_sufficent(true);
             it = _paused_queries_list.erase(it);
             continue;
         }
-
         bool is_low_wartermark = false;
         bool is_high_wartermark = false;
 
         wg->check_mem_used(&is_low_wartermark, &is_high_wartermark);
-
+        bool has_changed_hard_limit = false;
         // If the query is paused because its limit exceed the query itself's 
memlimit, then just spill disk.
         // The query's memlimit is set using slot mechanism and its value is 
set using the user settings, not
         // by weighted value. So if reserve failed, then it is actually exceed 
limit.
@@ -379,17 +305,30 @@ void WorkloadGroupMgr::handle_paused_queries() {
                 continue;
             }
             if (query_ctx->is_cancelled()) {
-                /// Memory may not be released immediately after a query is 
canceled.
-                /// So here wait for a while.
-                if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE) {
-                    LOG(INFO) << "query: " << print_id(query_ctx->query_id())
-                              << " was canceled, remove from paused list";
-                    query_it = queries_list.erase(query_it);
-                }
+                LOG(INFO) << "query: " << print_id(query_ctx->query_id())
+                          << " was canceled, remove from paused list";
+                query_it = queries_list.erase(query_it);
                 continue;
             }
+
+            // Only deal with non overcommit workload group.
+            if (wg->enable_memory_overcommit() && 
!wg->has_changed_to_hard_limit() &&
+                
!query_ctx->paused_reason().is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) {
+                // Soft limit wg will only reserve failed when process limit 
exceed. But in some corner case,
+                // when reserve, the wg is hard limit, the query reserve 
failed, but when this loop run
+                // the wg is converted to soft limit.
+                // So that should resume the query.
+                LOG(WARNING) << "query: " << print_id(query_ctx->query_id())
+                             << " reserve memory failed, but workload group 
not converted to hard "
+                                "limit, it should not happen, resume it again. 
paused reason: "
+                             << query_ctx->paused_reason();
+                query_ctx->set_memory_sufficient(true);
+                query_it = queries_list.erase(query_it);
+                continue;
+            }
+
             if 
(query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
-                bool spill_res = spill_or_cancel_query(query_ctx, 
query_ctx->paused_reason());
+                bool spill_res = handle_single_query(query_ctx, 
query_ctx->paused_reason());
                 if (!spill_res) {
                     ++query_it;
                     continue;
@@ -398,14 +337,51 @@ void WorkloadGroupMgr::handle_paused_queries() {
                     continue;
                 }
             } else if 
(query_ctx->paused_reason().is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) {
-                if (wg->memory_sufficent()) {
-                    wg->update_memory_sufficent(false);
+                if (!has_changed_hard_limit) {
+                    change_query_to_hard_limit(wg, true);
+                    has_changed_hard_limit = true;
                     LOG(INFO) << "query: " << print_id(query_ctx->query_id())
                               << " reserve memory failed due to workload group 
memory exceed, "
                                  "should set the workload group work in memory 
insufficent mode, "
                                  "so that other query will reduce their 
memory. wg: "
                               << wg->debug_string();
                 }
+                // If there are a lot of memtable memory, then wait them flush 
finished.
+                MemTableMemoryLimiter* memtable_limiter =
+                        
doris::ExecEnv::GetInstance()->memtable_memory_limiter();
+                // Not use memlimit, should use high water mark.
+                int64_t memtable_active_bytes = 0;
+                int64_t memtable_queue_bytes = 0;
+                int64_t memtable_flush_bytes = 0;
+                wg->get_load_mem_usage(&memtable_active_bytes, 
&memtable_queue_bytes,
+                                       &memtable_flush_bytes);
+                // TODO: should add a signal in memtable limiter to prevent 
new batch
+                // For example, streamload, it will not reserve many memory, 
but it will occupy many memtable memory.
+                // TODO: 0.2 should be a workload group properties. For 
example, the group is optimized for load,then the value
+                // should be larged, if the group is optimized for query, then 
the value should be smaller.
+                int64_t max_wg_memtable_bytes = wg->load_buffer_limit();
+                if (memtable_active_bytes + memtable_queue_bytes + 
memtable_flush_bytes >
+                    max_wg_memtable_bytes) {
+                    // There are many table in flush queue, just waiting them 
flush finished.
+                    if (memtable_active_bytes < 
(int64_t)(max_wg_memtable_bytes * 0.6)) {
+                        LOG_EVERY_T(INFO, 60)
+                                << wg->name() << " load memtable size is: " << 
memtable_active_bytes
+                                << ", " << memtable_queue_bytes << ", " << 
memtable_flush_bytes
+                                << ", load buffer limit is: " << 
max_wg_memtable_bytes
+                                << " wait for flush finished to release more 
memory";
+                        continue;
+                    } else {
+                        // Flush some memtables(currently written) to flush 
queue.
+                        memtable_limiter->flush_workload_group_memtables(
+                                wg->id(),
+                                memtable_active_bytes - 
(int64_t)(max_wg_memtable_bytes * 0.6));
+                        LOG_EVERY_T(INFO, 60)
+                                << wg->name() << " load memtable size is: " << 
memtable_active_bytes
+                                << ", " << memtable_queue_bytes << ", " << 
memtable_flush_bytes
+                                << ", flush some active memtable to revoke 
memory";
+                        continue;
+                    }
+                }
                 // Should not put the query back to task scheduler 
immediately, because when wg's memory not sufficient,
                 // and then set wg's flag, other query may not free memory 
very quickly.
                 if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE) {
@@ -418,6 +394,8 @@ void WorkloadGroupMgr::handle_paused_queries() {
                 }
                 continue;
             } else {
+                // PROCESS Reserve logic using hard limit, if reached here, 
should try to spill or cancel.
+                // GC Logic also work at hard limit, so GC may cancel some 
query and could not spill here.
                 // If wg's memlimit not exceed, but process memory exceed, it 
means cache or other metadata
                 // used too much memory. Should clean all cache here.
                 // 1. Check cache used, if cache is larger than > 0, then just 
return and wait for it to 0 to release some memory.
@@ -433,7 +411,8 @@ void WorkloadGroupMgr::handle_paused_queries() {
                                  "to 0 now";
                 }
                 if (query_it->cache_ratio_ < 0.001) {
-                    bool spill_res = spill_or_cancel_query(query_ctx, 
query_ctx->paused_reason());
+                    // TODO: Find other exceed limit workload group and cancel 
query.
+                    bool spill_res = handle_single_query(query_ctx, 
query_ctx->paused_reason());
                     if (!spill_res) {
                         ++query_it;
                         continue;
@@ -460,8 +439,41 @@ void WorkloadGroupMgr::handle_paused_queries() {
     }
 }
 
-bool WorkloadGroupMgr::spill_or_cancel_query(std::shared_ptr<QueryContext> 
query_ctx,
-                                             Status paused_reason) {
+void WorkloadGroupMgr::handle_overcommit_wg_paused_queries() {
+    std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+    // If there is only one workload group and it is overcommit, then do 
nothing.
+    // And should also start MinorGC logic.
+    if (_workload_groups.size() == 1) {
+        return;
+    }
+    if (doris::GlobalMemoryArbitrator::is_exceed_soft_mem_limit(100 * 1024 * 
1024)) {
+        for (auto& [wg_id, wg] : _workload_groups) {
+            if (wg->enable_memory_overcommit() && 
!wg->has_changed_to_hard_limit()) {
+                wg->change_to_hard_limit(true);
+                LOG(INFO) << "Process memory usage will exceed soft limit, 
change all workload "
+                             "group with overcommit to hard limit now. "
+                          << wg->debug_string();
+            }
+        }
+    }
+    // If current memory usage is below soft memlimit - 10%, then enable wg's 
overcommit
+    if (!doris::GlobalMemoryArbitrator::is_exceed_soft_mem_limit(
+                (int64_t)(MemInfo::mem_limit() * 0.1))) {
+        for (auto& [wg_id, wg] : _workload_groups) {
+            if (wg->enable_memory_overcommit() && 
wg->has_changed_to_hard_limit()) {
+                wg->change_to_hard_limit(false);
+                LOG(INFO) << "Process memory usage is lower than soft limit, 
enable all workload "
+                             "group overcommit now. "
+                          << wg->debug_string();
+            }
+        }
+    }
+}
+// If the query could release some memory, for example, spill disk, flush 
memtable then the return value is true.
+// If the query could not release memory, then cancel the query, the return 
value is true.
+// If the query is not ready to do these tasks, it means just wait.
+bool WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext> 
query_ctx,
+                                           Status paused_reason) {
     // TODO: If the query is an insert into select query, should consider 
memtable as revoke memory.
     size_t revocable_size = 0;
     size_t memory_usage = 0;
@@ -494,6 +506,109 @@ bool 
WorkloadGroupMgr::spill_or_cancel_query(std::shared_ptr<QueryContext> query
     return true;
 }
 
+void WorkloadGroupMgr::change_query_to_hard_limit(WorkloadGroupPtr wg, bool 
enable_hard_limit) {
+    auto wg_mem_limit = wg->memory_limit();
+    auto wg_weighted_mem_limit = int64_t(wg_mem_limit * 1);
+    wg->set_weighted_memory_limit(wg_weighted_mem_limit);
+    auto all_query_ctxs = wg->queries();
+    bool is_low_wartermark = false;
+    bool is_high_wartermark = false;
+    wg->check_mem_used(&is_low_wartermark, &is_high_wartermark);
+    int64_t wg_high_water_mark_limit =
+            (int64_t)(wg_mem_limit * wg->spill_threshold_high_water_mark() * 
1.0 / 100);
+    int64_t memtable_active_bytes = 0;
+    int64_t memtable_queue_bytes = 0;
+    int64_t memtable_flush_bytes = 0;
+    wg->get_load_mem_usage(&memtable_active_bytes, &memtable_queue_bytes, 
&memtable_flush_bytes);
+    int64_t memtable_usage = memtable_active_bytes + memtable_queue_bytes + 
memtable_flush_bytes;
+    int64_t wg_high_water_mark_except_load = wg_high_water_mark_limit;
+    if (memtable_usage > wg->load_buffer_limit()) {
+        wg_high_water_mark_except_load = wg_high_water_mark_limit - 
wg->load_buffer_limit();
+    } else {
+        wg_high_water_mark_except_load =
+                wg_high_water_mark_limit - memtable_usage - 10 * 1024 * 1024;
+    }
+    std::string debug_msg;
+    if (is_high_wartermark || is_low_wartermark) {
+        debug_msg = fmt::format(
+                "\nWorkload Group {}: mem limit: {}, mem used: {}, weighted 
mem limit: {}, "
+                "high water mark mem limit: {}, load memtable usage: {}, used 
ratio: {}",
+                wg->name(), PrettyPrinter::print(wg->memory_limit(), 
TUnit::BYTES),
+                PrettyPrinter::print(wg->total_mem_used(), TUnit::BYTES),
+                PrettyPrinter::print(wg_weighted_mem_limit, TUnit::BYTES),
+                PrettyPrinter::print(wg_high_water_mark_limit, TUnit::BYTES),
+                PrettyPrinter::print(memtable_usage, TUnit::BYTES),
+                (double)(wg->total_mem_used()) / wg_weighted_mem_limit);
+    }
+
+    // If the wg enable over commit memory, then it is no need to update query 
memlimit
+    if (wg->enable_memory_overcommit() && !wg->has_changed_to_hard_limit()) {
+        return;
+    }
+    int32_t total_used_slot_count = 0;
+    int32_t total_slot_count = wg->total_query_slot_count();
+    // calculate total used slot count
+    for (const auto& query : all_query_ctxs) {
+        auto query_ctx = query.second.lock();
+        if (!query_ctx) {
+            continue;
+        }
+        // Streamload kafka load group commit, not modify slot
+        if (!query_ctx->is_pure_load_task()) {
+            total_used_slot_count += query_ctx->get_slot_count();
+        }
+    }
+    // calculate per query weighted memory limit
+    debug_msg = "Query Memory Summary:";
+    for (const auto& query : all_query_ctxs) {
+        auto query_ctx = query.second.lock();
+        if (!query_ctx) {
+            continue;
+        }
+        int64_t query_weighted_mem_limit = 0;
+        // If the query enable hard limit, then it should not use the soft 
limit
+        if (query_ctx->enable_query_slot_hard_limit()) {
+            if (total_slot_count < 1) {
+                LOG(WARNING)
+                        << "query " << print_id(query_ctx->query_id())
+                        << " enabled hard limit, but the slot count < 1, could 
not take affect";
+            } else {
+                // If the query enable hard limit, then not use weighted info 
any more, just use the settings limit.
+                query_weighted_mem_limit = 
(int64_t)((wg_high_water_mark_except_load *
+                                                      
query_ctx->get_slot_count() * 1.0) /
+                                                     total_slot_count);
+            }
+        } else {
+            // If low water mark is not reached, then use process memory limit 
as query memory limit.
+            // It means it will not take effect.
+            // If there are some query in paused list, then limit should take 
effect.
+            if (!is_low_wartermark && !enable_hard_limit) {
+                query_weighted_mem_limit = wg_high_water_mark_except_load;
+            } else {
+                query_weighted_mem_limit = total_used_slot_count > 0
+                                                   ? 
(int64_t)((wg_high_water_mark_except_load +
+                                                                
total_used_slot_count) *
+                                                               
query_ctx->get_slot_count() * 1.0 /
+                                                               
total_used_slot_count)
+                                                   : 
wg_high_water_mark_except_load;
+            }
+        }
+        debug_msg += fmt::format(
+                "\n    MemTracker Label={}, Used={}, Limit={}, Peak={}",
+                query_ctx->get_mem_tracker()->label(),
+                
PrettyPrinter::print(query_ctx->get_mem_tracker()->consumption(), TUnit::BYTES),
+                PrettyPrinter::print(query_weighted_mem_limit, TUnit::BYTES),
+                
PrettyPrinter::print(query_ctx->get_mem_tracker()->peak_consumption(),
+                                     TUnit::BYTES));
+        // If the query is a pure load task, then should not modify its limit. 
Or it will reserve
+        // memory failed and we did not hanle it.
+        if (!query_ctx->is_pure_load_task()) {
+            query_ctx->set_mem_limit(query_weighted_mem_limit);
+        }
+    }
+    LOG_EVERY_T(INFO, 60) << debug_msg;
+}
+
 void WorkloadGroupMgr::stop() {
     for (auto iter = _workload_groups.begin(); iter != _workload_groups.end(); 
iter++) {
         iter->second->try_stop_schedulers();
diff --git a/be/src/runtime/workload_group/workload_group_manager.h 
b/be/src/runtime/workload_group/workload_group_manager.h
index f84bf3a29ff..8f69d5653b4 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -45,8 +45,11 @@ public:
     std::chrono::system_clock::time_point enqueue_at;
     size_t last_mem_usage {0};
     double cache_ratio_ {0.0};
+    bool any_wg_exceed_limit_ {false};
+    int64_t reserve_size_ {0};
 
-    PausedQuery(std::shared_ptr<QueryContext> query_ctx, double cache_ratio);
+    PausedQuery(std::shared_ptr<QueryContext> query_ctx, double cache_ratio,
+                bool any_wg_exceed_limit, int64_t reserve_size);
 
     int64_t elapsed_time() const {
         auto now = std::chrono::system_clock::now();
@@ -96,14 +99,17 @@ public:
 
     void get_wg_resource_usage(vectorized::Block* block);
 
-    void add_paused_query(const std::shared_ptr<QueryContext>& query_ctx);
+    void add_paused_query(const std::shared_ptr<QueryContext>& query_ctx, 
int64_t reserve_size);
 
     void handle_paused_queries();
 
     void update_load_memtable_usage(const std::map<uint64_t, MemtableUsage>& 
wg_memtable_usages);
 
 private:
-    bool spill_or_cancel_query(std::shared_ptr<QueryContext> query_ctx, Status 
paused_reason);
+    bool handle_single_query(std::shared_ptr<QueryContext> query_ctx, Status 
paused_reason);
+    void handle_non_overcommit_wg_paused_queries();
+    void handle_overcommit_wg_paused_queries();
+    void change_query_to_hard_limit(WorkloadGroupPtr wg, bool 
enable_hard_limit);
 
 private:
     std::shared_mutex _group_mutex;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to