This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 1a32b1b56d4 branch-4.0: [enhance](memory) back pressure writing when 
memory usage is high in sink operation #58530 (#58704)
1a32b1b56d4 is described below

commit 1a32b1b56d4afc7cb4e6bc2456aaa07f93a9f90b
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Jan 5 16:56:51 2026 +0800

    branch-4.0: [enhance](memory) back pressure writing when memory usage is 
high in sink operation #58530 (#58704)
    
    Cherry-picked from #58530
    
    Co-authored-by: hui lai <[email protected]>
---
 be/src/vec/sink/writer/vtablet_writer.cpp | 8 +++++++-
 be/src/vec/sink/writer/vtablet_writer.h   | 1 +
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 62531151f82..b4a76da9f9b 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -509,6 +509,7 @@ VNodeChannel::VNodeChannel(VTabletWriter* parent, 
IndexChannel* index_channel, i
     _node_channel_tracker = std::make_shared<MemTracker>(
             fmt::format("NodeChannel:indexID={}:threadId={}",
                         std::to_string(_index_channel->_index_id), 
ThreadContext::get_thread_id()));
+    _load_mem_limit = MemInfo::mem_limit() * 
config::load_process_max_memory_limit_percent / 100;
 }
 
 VNodeChannel::~VNodeChannel() = default;
@@ -747,8 +748,13 @@ Status VNodeChannel::add_block(vectorized::Block* block, 
const Payload* payload)
     // But there is still some unfinished things, we do mem limit here 
temporarily.
     // _cancelled may be set by rpc callback, and it's possible that 
_cancelled might be set in any of the steps below.
     // It's fine to do a fake add_block() and return OK, because we will check 
_cancelled in next add_block() or mark_close().
+    bool is_exceed_soft_mem_limit = 
GlobalMemoryArbitrator::is_exceed_soft_mem_limit();
+    auto current_load_mem_value = MemoryProfile::load_current_usage();
+    bool mem_limit_exceeded = is_exceed_soft_mem_limit ||
+                              current_load_mem_value > _load_mem_limit ||
+                              _pending_batches_bytes > 
_max_pending_batches_bytes;
     while (!_cancelled && !_state->is_cancelled() && _pending_batches_num > 0 
&&
-           _pending_batches_bytes > _max_pending_batches_bytes) {
+           mem_limit_exceeded) {
         SCOPED_RAW_TIMER(&_stat.mem_exceeded_block_ns);
         std::this_thread::sleep_for(std::chrono::milliseconds(10));
     }
diff --git a/be/src/vec/sink/writer/vtablet_writer.h 
b/be/src/vec/sink/writer/vtablet_writer.h
index ba1c0c505d6..039dd10a2c7 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -357,6 +357,7 @@ protected:
     std::string _name;
 
     std::shared_ptr<MemTracker> _node_channel_tracker;
+    int64_t _load_mem_limit = -1;
 
     TupleDescriptor* _tuple_desc = nullptr;
     NodeInfo _node_info;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to