This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e3bac86b43 [bugfix](vtablet_sink) fix max_pending_bytes for 
vtablet_sink (#9462)
e3bac86b43 is described below

commit e3bac86b436a1008d8f4e451ecb7c2c9e318e9af
Author: yixiutt <[email protected]>
AuthorDate: Wed May 11 18:00:56 2022 +0800

    [bugfix](vtablet_sink) fix max_pending_bytes for vtablet_sink (#9462)
    
    
    
    Co-authored-by: yixiutt <[email protected]>
---
 be/src/vec/sink/vtablet_sink.cpp | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 4794a0d003..a86ef57cf2 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -152,7 +152,10 @@ Status VNodeChannel::add_row(const BlockRow& block_row, 
int64_t tablet_id) {
     // But there is still some unfinished things, we do mem limit here 
temporarily.
     // _cancelled may be set by rpc callback, and it's possible that 
_cancelled might be set in any of the steps below.
     // It's fine to do a fake add_row() and return OK, because we will check 
_cancelled in next add_row() or mark_close().
-    while (!_cancelled && _parent->_mem_tracker->any_limit_exceeded() && 
_pending_batches_num > 0) {
+    while (!_cancelled &&
+           (_pending_batches_bytes > _max_pending_batches_bytes ||
+            _parent->_mem_tracker->any_limit_exceeded()) &&
+           _pending_batches_num > 0) {
         SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
         std::this_thread::sleep_for(std::chrono::milliseconds(10));
     }
@@ -165,6 +168,7 @@ Status VNodeChannel::add_row(const BlockRow& block_row, 
int64_t tablet_id) {
             SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns);
             std::lock_guard<std::mutex> l(_pending_batches_lock);
             //To simplify the add_row logic, postpone adding block into req 
until the time of sending req
+            _pending_batches_bytes += _cur_mutable_block->allocated_bytes();
             _pending_blocks.emplace(std::move(_cur_mutable_block), 
_cur_add_block_request);
             _pending_batches_num++;
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to