This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 5d86ae6ccc9 branch-4.0: [enhance](memory) consider memtable memory 
when sink operator back pressure #60291 (#60502)
5d86ae6ccc9 is described below

commit 5d86ae6ccc96cd0b88394bca0514ee3fb1687276
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Feb 5 09:29:41 2026 +0800

    branch-4.0: [enhance](memory) consider memtable memory when sink operator 
back pressure #60291 (#60502)
    
    Cherry-picked from #60291
    
    Co-authored-by: hui lai <[email protected]>
---
 be/src/vec/sink/writer/vtablet_writer.cpp | 29 +++++++++++++++++++++--------
 1 file changed, 21 insertions(+), 8 deletions(-)

diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 4864ad79261..270b710634c 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -748,15 +748,28 @@ Status VNodeChannel::add_block(vectorized::Block* block, 
const Payload* payload)
     // But there is still some unfinished things, we do mem limit here 
temporarily.
     // _cancelled may be set by rpc callback, and it's possible that 
_cancelled might be set in any of the steps below.
     // It's fine to do a fake add_block() and return OK, because we will check 
_cancelled in next add_block() or mark_close().
-    bool is_exceed_soft_mem_limit = 
GlobalMemoryArbitrator::is_exceed_soft_mem_limit();
-    auto current_load_mem_value = MemoryProfile::load_current_usage();
-    bool mem_limit_exceeded = is_exceed_soft_mem_limit ||
-                              current_load_mem_value > _load_mem_limit ||
-                              _pending_batches_bytes > 
_max_pending_batches_bytes;
-    while (!_cancelled && !_state->is_cancelled() && _pending_batches_num > 0 
&&
-           mem_limit_exceeded) {
+    constexpr int64_t kBackPressureSleepMs = 10;
+    auto* memtable_limiter = ExecEnv::GetInstance()->memtable_memory_limiter();
+    while (true) {
+        bool is_exceed_soft_mem_limit = 
GlobalMemoryArbitrator::is_exceed_soft_mem_limit();
+        int64_t memtable_mem =
+                (memtable_limiter != nullptr && 
memtable_limiter->mem_tracker() != nullptr)
+                        ? memtable_limiter->mem_tracker()->consumption()
+                        : 0;
+        // Note: Memtable memory is not included in load memory statistics 
(MemoryProfile::load_current_usage())
+        // for performance and memory control complexity reasons. Therefore, 
we explicitly add memtable memory
+        // consumption here to ensure accurate back pressure decisions and 
prevent OOM during heavy loads.
+        auto current_load_mem_value = MemoryProfile::load_current_usage() + 
memtable_mem;
+        bool mem_limit_exceeded = is_exceed_soft_mem_limit ||
+                                  current_load_mem_value > _load_mem_limit ||
+                                  _pending_batches_bytes > 
_max_pending_batches_bytes;
+        bool need_back_pressure = !_cancelled && !_state->is_cancelled() &&
+                                  _pending_batches_num > 0 && 
mem_limit_exceeded;
+        if (!need_back_pressure) {
+            break;
+        }
         SCOPED_RAW_TIMER(&_stat.mem_exceeded_block_ns);
-        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+        
std::this_thread::sleep_for(std::chrono::milliseconds(kBackPressureSleepMs));
     }
 
     if (UNLIKELY(!_cur_mutable_block)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to