This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 93e651a8a63 branch-3.1: [enhance](memory) back pressure writing when
memory usage is high in sink operation #58530 (#58848)
93e651a8a63 is described below
commit 93e651a8a63f5f8f13e8fb57480b6107d828311c
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Dec 17 17:20:41 2025 +0800
branch-3.1: [enhance](memory) back pressure writing when memory usage is
high in sink operation #58530 (#58848)
Cherry-picked from #58530
Co-authored-by: hui lai <[email protected]>
---
be/src/vec/sink/writer/vtablet_writer.cpp | 8 +++++++-
be/src/vec/sink/writer/vtablet_writer.h | 1 +
2 files changed, 8 insertions(+), 1 deletion(-)
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index b0539551f70..0ec1c8360f8 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -505,6 +505,7 @@ VNodeChannel::VNodeChannel(VTabletWriter* parent,
IndexChannel* index_channel, i
_node_channel_tracker = std::make_shared<MemTracker>(
fmt::format("NodeChannel:indexID={}:threadId={}",
std::to_string(_index_channel->_index_id),
ThreadContext::get_thread_id()));
+ _load_mem_limit = MemInfo::mem_limit() *
config::load_process_max_memory_limit_percent / 100;
}
VNodeChannel::~VNodeChannel() = default;
@@ -743,8 +744,13 @@ Status VNodeChannel::add_block(vectorized::Block* block,
const Payload* payload)
// But there is still some unfinished things, we do mem limit here
temporarily.
// _cancelled may be set by rpc callback, and it's possible that
_cancelled might be set in any of the steps below.
// It's fine to do a fake add_block() and return OK, because we will check
_cancelled in next add_block() or mark_close().
+ bool is_exceed_soft_mem_limit =
GlobalMemoryArbitrator::is_exceed_soft_mem_limit();
+ auto current_load_mem_value = MemoryProfile::load_current_usage();
+ bool mem_limit_exceeded = is_exceed_soft_mem_limit ||
+ current_load_mem_value > _load_mem_limit ||
+ _pending_batches_bytes >
_max_pending_batches_bytes;
while (!_cancelled && !_state->is_cancelled() && _pending_batches_num > 0
&&
- _pending_batches_bytes > _max_pending_batches_bytes) {
+ mem_limit_exceeded) {
SCOPED_RAW_TIMER(&_stat.mem_exceeded_block_ns);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
diff --git a/be/src/vec/sink/writer/vtablet_writer.h
b/be/src/vec/sink/writer/vtablet_writer.h
index 70aeb2d0e16..636399e90d5 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -350,6 +350,7 @@ protected:
std::string _name;
std::shared_ptr<MemTracker> _node_channel_tracker;
+ int64_t _load_mem_limit = -1;
TupleDescriptor* _tuple_desc = nullptr;
NodeInfo _node_info;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]