This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new e3bac86b43 [bugfix](vtablet_sink) fix max_pending_bytes for
vtablet_sink (#9462)
e3bac86b43 is described below
commit e3bac86b436a1008d8f4e451ecb7c2c9e318e9af
Author: yixiutt <[email protected]>
AuthorDate: Wed May 11 18:00:56 2022 +0800
[bugfix](vtablet_sink) fix max_pending_bytes for vtablet_sink (#9462)
Co-authored-by: yixiutt <[email protected]>
---
be/src/vec/sink/vtablet_sink.cpp | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 4794a0d003..a86ef57cf2 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -152,7 +152,10 @@ Status VNodeChannel::add_row(const BlockRow& block_row,
int64_t tablet_id) {
// But there is still some unfinished things, we do mem limit here
temporarily.
// _cancelled may be set by rpc callback, and it's possible that
_cancelled might be set in any of the steps below.
// It's fine to do a fake add_row() and return OK, because we will check
_cancelled in next add_row() or mark_close().
- while (!_cancelled && _parent->_mem_tracker->any_limit_exceeded() &&
_pending_batches_num > 0) {
+ while (!_cancelled &&
+ (_pending_batches_bytes > _max_pending_batches_bytes ||
+ _parent->_mem_tracker->any_limit_exceeded()) &&
+ _pending_batches_num > 0) {
SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
@@ -165,6 +168,7 @@ Status VNodeChannel::add_row(const BlockRow& block_row,
int64_t tablet_id) {
SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns);
std::lock_guard<std::mutex> l(_pending_batches_lock);
//To simplify the add_row logic, postpone adding block into req
until the time of sending req
+ _pending_batches_bytes += _cur_mutable_block->allocated_bytes();
_pending_blocks.emplace(std::move(_cur_mutable_block),
_cur_add_block_request);
_pending_batches_num++;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]