This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch spill_repartition
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_repartition by this push:
     new 35f2c55989f [pipeline] Proactively pause query for spill under memory 
pressure in PipelineTask
35f2c55989f is described below

commit 35f2c55989fdaaa17ef1c845ac682856a497b063
Author: Hu Shenggang <[email protected]>
AuthorDate: Sat Feb 28 16:20:11 2026 +0800

    [pipeline] Proactively pause query for spill under memory pressure in 
PipelineTask
    
    - add `_should_trigger_revoking(reserve_size)` to detect high memory 
pressure from query usage and workload group watermark
    - trigger `add_paused_query(..., QUERY_MEMORY_EXCEEDED)` before reserve 
attempt when revocable memory is significant
    - apply `low_memory_mode` to all operators in pipeline (instead of only 
root)
    - refine reserve path in `_try_to_reserve_memory()`:
      - compute revocable memory only when reserve fails or 
`enable_force_spill` is on
      - keep force-spill guard based on max operator revocable size
    - remove temporary debug logs in execute/reserve path
    - minor cleanup: use `const auto&` iteration in `get_revocable_size()`
---
 be/src/pipeline/pipeline_task.cpp | 133 ++++++++++++++++++++++++++++++++------
 be/src/pipeline/pipeline_task.h   |   1 +
 2 files changed, 115 insertions(+), 19 deletions(-)

diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 93e0c9a1485..0ba4beee496 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -367,6 +367,66 @@ void PipelineTask::terminate() {
     }
 }
 
+// When current memory pressure is low, memory usage may increase 
significantly in the next
+// operator run, while there is no revocable memory available for spilling.
+// Trigger memory revoking when pressure is high and revocable memory is 
significant.
+// Memory pressure is evaluated using two signals:
+// 1. Query memory usage exceeds a threshold ratio of the query memory limit.
+// 2. Workload group memory usage reaches the workload group low-watermark 
threshold.
+bool PipelineTask::_should_trigger_revoking(const size_t reserve_size) const {
+    if (!_state->enable_spill()) {
+        return false;
+    }
+
+    auto query_mem_tracker = _state->get_query_ctx()->query_mem_tracker();
+    auto wg = _state->get_query_ctx()->workload_group();
+    if (!query_mem_tracker || !wg) {
+        return false;
+    }
+
+    const auto parallelism = std::max(1, _pipeline->num_tasks());
+    const auto water_mark = std::max(std::min(wg->memory_low_watermark(), 50), 
10);
+    const auto group_mem_limit = wg->memory_limit();
+    auto query_limit = query_mem_tracker->limit();
+    if (query_limit <= 0) {
+        query_limit = group_mem_limit;
+    } else if (query_limit > group_mem_limit && group_mem_limit > 0) {
+        query_limit = group_mem_limit;
+    }
+
+    if (query_limit <= 0) {
+        return false;
+    }
+
+    bool is_high_memory_pressure = false;
+    const auto used_mem = query_mem_tracker->consumption() + reserve_size * 
parallelism;
+    if (used_mem >= int64_t((double(query_limit) * water_mark / 100))) {
+        is_high_memory_pressure = true;
+    }
+
+    if (!is_high_memory_pressure) {
+        bool is_low_watermark;
+        bool is_high_watermark;
+        wg->check_mem_used(&is_low_watermark, &is_high_watermark);
+        is_high_memory_pressure = is_low_watermark || is_high_watermark;
+    }
+
+    if (is_high_memory_pressure) {
+        const auto revocable_size = [&]() {
+            size_t total = _sink->revocable_mem_size(_state);
+            for (const auto& op : _operators) {
+                total += op->revocable_mem_size(_state);
+            }
+            return total;
+        }();
+
+        const auto total_estimated_revocable = revocable_size * parallelism;
+        return total_estimated_revocable >= int64_t(double(query_limit) * 0.2);
+    }
+
+    return false;
+}
+
 /**
  * `_eos` indicates whether the execution phase is done. `done` indicates 
whether we could close
  * this task.
@@ -514,7 +574,9 @@ Status PipelineTask::execute(bool* done) {
             SCOPED_TIMER(_get_block_timer);
             if (_state->low_memory_mode()) {
                 _sink->set_low_memory_mode(_state);
-                _root->set_low_memory_mode(_state);
+                for (auto& op : _operators) {
+                    op->set_low_memory_mode(_state);
+                }
             }
             DEFER_RELEASE_RESERVED();
             _get_block_counter->update(1);
@@ -525,14 +587,26 @@ Status PipelineTask::execute(bool* done) {
                 reserve_size += op->get_reserve_mem_size(_state);
                 op->reset_reserve_mem_size(_state);
             }
-            LOG(INFO) << "1 " << reserve_size;
             if (workload_group &&
                 _state->get_query_ctx()
                         ->resource_ctx()
                         ->task_controller()
                         ->is_enable_reserve_memory() &&
                 reserve_size > 0) {
-                LOG(INFO) << "11 " << reserve_size;
+                if (_should_trigger_revoking(reserve_size)) {
+                    LOG(INFO) << fmt::format(
+                            "Query: {} sink: {}, node id: {}, task id: "
+                            "{} when high memory pressure, try to spill",
+                            print_id(_query_id), _sink->get_name(), 
_sink->node_id(),
+                            _state->task_id());
+                    
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
+                            
_state->get_query_ctx()->resource_ctx()->shared_from_this(),
+                            reserve_size,
+                            Status::Error<ErrorCode::QUERY_MEMORY_EXCEEDED>(
+                                    "high memory pressure, try to spill"));
+                    _spilling = true;
+                    continue;
+                }
                 if (!_try_to_reserve_memory(reserve_size, _root)) {
                     continue;
                 }
@@ -555,7 +629,21 @@ Status PipelineTask::execute(bool* done) {
                 workload_group && !(_wake_up_early || _dry_run)) {
                 const auto sink_reserve_size = 
_sink->get_reserve_mem_size(_state, _eos);
 
-                LOG(INFO) << "2 " << sink_reserve_size;
+                if (sink_reserve_size > 0 && 
_should_trigger_revoking(sink_reserve_size)) {
+                    LOG(INFO) << fmt::format(
+                            "Query: {} sink: {}, node id: {}, task id: "
+                            "{} when high memory pressure, try to spill",
+                            print_id(_query_id), _sink->get_name(), 
_sink->node_id(),
+                            _state->task_id());
+                    
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
+                            
_state->get_query_ctx()->resource_ctx()->shared_from_this(),
+                            sink_reserve_size,
+                            Status::Error<ErrorCode::QUERY_MEMORY_EXCEEDED>(
+                                    "high memory pressure, try to spill"));
+                    _spilling = true;
+                    continue;
+                }
+
                 if (sink_reserve_size > 0 &&
                     !_try_to_reserve_memory(sink_reserve_size, _sink.get())) {
                     continue;
@@ -666,7 +754,6 @@ Status PipelineTask::do_revoke_memory(const 
std::shared_ptr<SpillContext>& spill
 }
 
 bool PipelineTask::_try_to_reserve_memory(const size_t reserve_size, 
OperatorBase* op) {
-    LOG(INFO) << "111 " << reserve_size;
     auto st = 
thread_context()->thread_mem_tracker_mgr->try_reserve(reserve_size);
     // If reserve memory failed and the query is not enable spill, just 
disable reserve memory(this will enable
     // memory hard limit check, and will cancel the query if allocate memory 
failed) and let it run.
@@ -678,24 +765,32 @@ bool PipelineTask::_try_to_reserve_memory(const size_t 
reserve_size, OperatorBas
     }
     COUNTER_UPDATE(_memory_reserve_times, 1);
     // Compute total revocable memory across all operators and the sink.
-    size_t total_revocable_mem_size = _sink->revocable_mem_size(_state);
-    size_t operator_max_revocable_mem_size = total_revocable_mem_size;
-    for (auto& cur_op : _operators) {
-        total_revocable_mem_size += cur_op->revocable_mem_size(_state);
-        if (cur_op->revocable_mem_size(_state) > 
operator_max_revocable_mem_size) {
-            operator_max_revocable_mem_size = 
cur_op->revocable_mem_size(_state);
+    size_t total_revocable_mem_size = 0;
+    size_t operator_max_revocable_mem_size = 0;
+
+    if (!st.ok() || _state->enable_force_spill()) {
+        // Compute total revocable memory across all operators and the sink.
+        total_revocable_mem_size = _sink->revocable_mem_size(_state);
+        operator_max_revocable_mem_size = total_revocable_mem_size;
+        for (auto& cur_op : _operators) {
+            total_revocable_mem_size += cur_op->revocable_mem_size(_state);
+            operator_max_revocable_mem_size =
+                    std::max(cur_op->revocable_mem_size(_state), 
operator_max_revocable_mem_size);
         }
     }
+
     // During enable force spill, other operators like scan opeartor will also 
try to reserve memory and will failed
     // here, if not add this check, it will always paused and resumed again.
-    if (st.ok() && _state->enable_force_spill() &&
-        operator_max_revocable_mem_size >= 
_state->minimum_operator_memory_required_bytes()) {
-        st = Status::Error<ErrorCode::QUERY_MEMORY_EXCEEDED>(
-                "force spill and there is an operator has memory "
-                "size {} exceeds min mem size {}",
-                PrettyPrinter::print_bytes(operator_max_revocable_mem_size),
-                
PrettyPrinter::print_bytes(_state->minimum_operator_memory_required_bytes()));
+    if (st.ok() && _state->enable_force_spill()) {
+        if (operator_max_revocable_mem_size >= 
_state->minimum_operator_memory_required_bytes()) {
+            st = Status::Error<ErrorCode::QUERY_MEMORY_EXCEEDED>(
+                    "force spill and there is an operator has memory "
+                    "size {} exceeds min mem size {}",
+                    
PrettyPrinter::print_bytes(operator_max_revocable_mem_size),
+                    
PrettyPrinter::print_bytes(_state->minimum_operator_memory_required_bytes()));
+        }
     }
+
     if (!st.ok()) {
         COUNTER_UPDATE(_memory_reserve_failed_times, 1);
         // build per-operator revocable memory info string for debugging
@@ -880,7 +975,7 @@ size_t PipelineTask::get_revocable_size() const {
     // Sum revocable memory from every operator in the pipeline + the sink.
     // Each operator reports only its own revocable memory (no child 
recursion).
     size_t total = _sink->revocable_mem_size(_state);
-    for (auto& op : _operators) {
+    for (const auto& op : _operators) {
         total += op->revocable_mem_size(_state);
     }
     return total;
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index e2d51858be4..196a52fbe9e 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -201,6 +201,7 @@ private:
     // Operator `op` try to reserve memory before executing. Return false if 
reserve failed
     // otherwise return true.
     bool _try_to_reserve_memory(const size_t reserve_size, OperatorBase* op);
+    bool _should_trigger_revoking(const size_t reserve_size) const;
 
     const TUniqueId _query_id;
     const uint32_t _index;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to