This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 1a32b1b56d4 branch-4.0: [enhance](memory) back pressure writing when
memory usage is high in sink operation #58530 (#58704)
1a32b1b56d4 is described below
commit 1a32b1b56d4afc7cb4e6bc2456aaa07f93a9f90b
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Jan 5 16:56:51 2026 +0800
branch-4.0: [enhance](memory) back pressure writing when memory usage is
high in sink operation #58530 (#58704)
Cherry-picked from #58530
Co-authored-by: hui lai <[email protected]>
---
be/src/vec/sink/writer/vtablet_writer.cpp | 8 +++++++-
be/src/vec/sink/writer/vtablet_writer.h | 1 +
2 files changed, 8 insertions(+), 1 deletion(-)
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 62531151f82..b4a76da9f9b 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -509,6 +509,7 @@ VNodeChannel::VNodeChannel(VTabletWriter* parent,
IndexChannel* index_channel, i
_node_channel_tracker = std::make_shared<MemTracker>(
fmt::format("NodeChannel:indexID={}:threadId={}",
std::to_string(_index_channel->_index_id),
ThreadContext::get_thread_id()));
+ _load_mem_limit = MemInfo::mem_limit() *
config::load_process_max_memory_limit_percent / 100;
}
VNodeChannel::~VNodeChannel() = default;
@@ -747,8 +748,13 @@ Status VNodeChannel::add_block(vectorized::Block* block,
const Payload* payload)
// But there is still some unfinished things, we do mem limit here
temporarily.
// _cancelled may be set by rpc callback, and it's possible that
_cancelled might be set in any of the steps below.
// It's fine to do a fake add_block() and return OK, because we will check
_cancelled in next add_block() or mark_close().
+ bool is_exceed_soft_mem_limit =
GlobalMemoryArbitrator::is_exceed_soft_mem_limit();
+ auto current_load_mem_value = MemoryProfile::load_current_usage();
+ bool mem_limit_exceeded = is_exceed_soft_mem_limit ||
+ current_load_mem_value > _load_mem_limit ||
+ _pending_batches_bytes >
_max_pending_batches_bytes;
while (!_cancelled && !_state->is_cancelled() && _pending_batches_num > 0
&&
- _pending_batches_bytes > _max_pending_batches_bytes) {
+ mem_limit_exceeded) {
SCOPED_RAW_TIMER(&_stat.mem_exceeded_block_ns);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
diff --git a/be/src/vec/sink/writer/vtablet_writer.h
b/be/src/vec/sink/writer/vtablet_writer.h
index ba1c0c505d6..039dd10a2c7 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -357,6 +357,7 @@ protected:
std::string _name;
std::shared_ptr<MemTracker> _node_channel_tracker;
+ int64_t _load_mem_limit = -1;
TupleDescriptor* _tuple_desc = nullptr;
NodeInfo _node_info;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]