This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.0
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit ca058465f861f1df95e25d1a3b009e71c5bdf2ea
Author: Kang <[email protected]>
AuthorDate: Sun Mar 13 22:12:15 2022 +0800

    [improvement](memory) fix olap table scan and sink memory usage problem 
(#8451)
    
    Due to unlimited queue in OlapScanNode and NodeChannel, memory usage can be
    very large for reading and writing large table, e.g 'insert into tableB 
select * from tableA'.
---
 be/src/common/config.h              |  4 ++-
 be/src/exec/olap_scan_node.cpp      | 22 +++++++++++++---
 be/src/exec/olap_scan_node.h        |  6 +++++
 be/src/exec/olap_scanner.cpp        | 10 ++++----
 be/src/exec/tablet_sink.cpp         |  9 +++++--
 be/src/exec/tablet_sink.h           |  3 +++
 be/src/vec/exec/volap_scan_node.cpp | 51 +++++++++++++++++++++++++++++++------
 be/src/vec/exec/volap_scanner.cpp   |  8 +++++-
 8 files changed, 92 insertions(+), 21 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 8f3b0b7..26bd081 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -167,8 +167,10 @@ CONF_mInt64(thrift_client_retry_interval_ms, "1000");
 CONF_mInt32(doris_scan_range_row_count, "524288");
 // size of scanner queue between scanner thread and compute thread
 CONF_mInt32(doris_scanner_queue_size, "1024");
-// single read execute fragment row size
+// single read execute fragment row number
 CONF_mInt32(doris_scanner_row_num, "16384");
+// single read execute fragment row bytes
+CONF_mInt32(doris_scanner_row_bytes, "10485760");
 // number of max scan keys
 CONF_mInt32(doris_max_scan_key_num, "1024");
 // the max number of push down values of a single column.
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index af26ae1..19ec140 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -77,6 +77,8 @@ Status OlapScanNode::init(const TPlanNode& tnode, 
RuntimeState* state) {
         _max_pushdown_conditions_per_column = 
config::max_pushdown_conditions_per_column;
     }
 
+    _max_scanner_queue_size_bytes = query_options.mem_limit / 20; //TODO: 
session variable percent
+
     /// TODO: could one filter used in the different scan_node ?
     int filter_size = _runtime_filter_descs.size();
     _runtime_filter_ctxs.resize(filter_size);
@@ -306,6 +308,7 @@ Status OlapScanNode::get_next(RuntimeState* state, 
RowBatch* row_batch, bool* eo
             materialized_batch = _materialized_row_batches.front();
             DCHECK(materialized_batch != nullptr);
             _materialized_row_batches.pop_front();
+            _materialized_row_batches_bytes -= 
materialized_batch->tuple_data_pool()->total_reserved_bytes();
         }
     }
 
@@ -394,12 +397,14 @@ Status OlapScanNode::close(RuntimeState* state) {
     }
 
     _materialized_row_batches.clear();
+    _materialized_row_batches_bytes = 0;
 
     for (auto row_batch : _scan_row_batches) {
         delete row_batch;
     }
 
     _scan_row_batches.clear();
+    _scan_row_batches_bytes = 0;
 
     // OlapScanNode terminate by exception
     // so that initiative close the Scanner
@@ -1371,6 +1376,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
     int max_thread = _max_materialized_row_batches;
     if (config::doris_scanner_row_num > state->batch_size()) {
         max_thread /= config::doris_scanner_row_num / state->batch_size();
+        if (max_thread <= 0) max_thread = 1;
     }
     // read from scanner
     while (LIKELY(status.ok())) {
@@ -1393,7 +1399,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
             if (state->fragment_mem_tracker() != nullptr) {
                 mem_consume = state->fragment_mem_tracker()->consumption();
             }
-            if (mem_consume < (mem_limit * 6) / 10) {
+            if (mem_consume < (mem_limit * 6) / 10 && _scan_row_batches_bytes 
< _max_scanner_queue_size_bytes / 2) {
                 thread_slot_num = max_thread - assigned_thread_num;
             } else {
                 // Memory already exceed
@@ -1473,6 +1479,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
             if (LIKELY(!_scan_row_batches.empty())) {
                 scan_batch = _scan_row_batches.front();
                 _scan_row_batches.pop_front();
+                _scan_row_batches_bytes -= 
scan_batch->tuple_data_pool()->total_reserved_bytes();
 
                 // delete scan_batch if transfer thread should be stopped
                 // because scan_batch wouldn't be useful anymore
@@ -1573,10 +1580,12 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) 
{
     // need yield this thread when we do enough work. However, OlapStorage read
     // data in pre-aggregate mode, then we can't use storage returned data to
     // judge if we need to yield. So we record all raw data read in this round
-    // scan, if this exceed threshold, we yield this thread.
+    // scan, if this exceed row number or bytes threshold, we yield this 
thread.
     int64_t raw_rows_read = scanner->raw_rows_read();
     int64_t raw_rows_threshold = raw_rows_read + config::doris_scanner_row_num;
-    while (!eos && raw_rows_read < raw_rows_threshold) {
+    int64_t raw_bytes_read = 0;
+    int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
+    while (!eos && raw_rows_read < raw_rows_threshold && raw_bytes_read < 
raw_bytes_threshold) {
         if (UNLIKELY(_transfer_done)) {
             eos = true;
             status = Status::Cancelled("Cancelled");
@@ -1602,6 +1611,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) {
             row_batchs.push_back(row_batch);
             __sync_fetch_and_add(&_buffered_bytes,
                                  
row_batch->tuple_data_pool()->total_reserved_bytes());
+            raw_bytes_read += 
row_batch->tuple_data_pool()->total_reserved_bytes();
         }
         raw_rows_read = scanner->raw_rows_read();
     }
@@ -1630,6 +1640,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) {
         } else {
             for (auto rb : row_batchs) {
                 _scan_row_batches.push_back(rb);
+                _scan_row_batches_bytes += 
rb->tuple_data_pool()->total_reserved_bytes();
             }
         }
         // If eos is true, we will process out of this lock block.
@@ -1669,13 +1680,16 @@ Status OlapScanNode::add_one_batch(RowBatch* row_batch) 
{
     {
         std::unique_lock<std::mutex> l(_row_batches_lock);
 
-        while (UNLIKELY(_materialized_row_batches.size() >= 
_max_materialized_row_batches &&
+        // check queue limit for both both batch size and bytes
+        while (UNLIKELY((_materialized_row_batches.size() >= 
_max_materialized_row_batches ||
+                         _materialized_row_batches_bytes >= 
_max_scanner_queue_size_bytes / 2) &&
                         !_transfer_done)) {
             _row_batch_consumed_cv.wait(l);
         }
 
         VLOG_CRITICAL << "Push row_batch to materialized_row_batches";
         _materialized_row_batches.push_back(row_batch);
+        _materialized_row_batches_bytes += 
row_batch->tuple_data_pool()->total_reserved_bytes();
     }
     // remove one batch, notify main thread
     _row_batch_added_cv.notify_one();
diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h
index 5ebd647..16ae85e 100644
--- a/be/src/exec/olap_scan_node.h
+++ b/be/src/exec/olap_scan_node.h
@@ -216,6 +216,8 @@ protected:
     std::condition_variable _row_batch_consumed_cv;
 
     std::list<RowBatch*> _materialized_row_batches;
+    // to limit _materialized_row_batches_bytes < 
_max_scanner_queue_size_bytes / 2
+    std::atomic_size_t _materialized_row_batches_bytes = 0;
 
     std::mutex _scan_batches_lock;
     std::condition_variable _scan_batch_added_cv;
@@ -223,10 +225,14 @@ protected:
     std::condition_variable _scan_thread_exit_cv;
 
     std::list<RowBatch*> _scan_row_batches;
+    // to limit _scan_row_batches_bytes < _max_scanner_queue_size_bytes / 2
+    std::atomic_size_t _scan_row_batches_bytes = 0;
 
     std::list<OlapScanner*> _olap_scanners;
 
     int _max_materialized_row_batches;
+    // to limit _materialized_row_batches_bytes and _scan_row_batches_bytes
+    size_t _max_scanner_queue_size_bytes;
     bool _start;
     // Used in Scan thread to ensure thread-safe
     std::atomic_bool _scanner_done;
diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index d7dc839..2fccdd5 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -265,11 +265,14 @@ Status OlapScanner::get_batch(RuntimeState* state, 
RowBatch* batch, bool* eof) {
 
     std::unique_ptr<MemPool> mem_pool(new MemPool(_mem_tracker.get()));
     int64_t raw_rows_threshold = raw_rows_read() + 
config::doris_scanner_row_num;
+    int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
     {
         SCOPED_TIMER(_parent->_scan_timer);
         while (true) {
-            // Batch is full, break
-            if (batch->is_full()) {
+            // Batch is full or reach raw_rows_threshold or 
raw_bytes_threshold, break
+            if (batch->is_full() ||
+                batch->tuple_data_pool()->total_reserved_bytes() >= 
raw_bytes_threshold ||
+                raw_rows_read() >= raw_rows_threshold) {
                 _update_realtime_counter();
                 break;
             }
@@ -421,9 +424,6 @@ Status OlapScanner::get_batch(RuntimeState* state, 
RowBatch* batch, bool* eof) {
                 }
             } while (false);
 
-            if (raw_rows_read() >= raw_rows_threshold) {
-                break;
-            }
         }
     }
 
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index d77fb4e..5b59508 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -102,6 +102,7 @@ Status NodeChannel::init(RuntimeState* state) {
 
     _rpc_timeout_ms = state->query_options().query_timeout * 1000;
     _timeout_watch.start();
+    _max_pending_batches_bytes = _parent->_load_mem_limit / 20; //TODO: 
session variable percent
 
     _load_info = "load_id=" + print_id(_parent->_load_id) +
                  ", txn_id=" + std::to_string(_parent->_txn_id);
@@ -246,7 +247,7 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t 
tablet_id) {
     // But there is still some unfinished things, we do mem limit here 
temporarily.
     // _cancelled may be set by rpc callback, and it's possible that 
_cancelled might be set in any of the steps below.
     // It's fine to do a fake add_row() and return OK, because we will check 
_cancelled in next add_row() or mark_close().
-    while (!_cancelled && 
_parent->_mem_tracker->AnyLimitExceeded(MemLimit::HARD) &&
+    while (!_cancelled && (_pending_batches_bytes > _max_pending_batches_bytes 
|| _parent->_mem_tracker->AnyLimitExceeded(MemLimit::HARD)) &&
            _pending_batches_num > 0) {
         SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
         SleepFor(MonoDelta::FromMilliseconds(10));
@@ -257,6 +258,7 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t 
tablet_id) {
         {
             SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns);
             std::lock_guard<std::mutex> l(_pending_batches_lock);
+            _pending_batches_bytes += 
_cur_batch->tuple_data_pool()->total_reserved_bytes();
             //To simplify the add_row logic, postpone adding batch into req 
until the time of sending req
             _pending_batches.emplace(std::move(_cur_batch), 
_cur_add_batch_request);
             _pending_batches_num++;
@@ -295,7 +297,7 @@ Status NodeChannel::add_row(BlockRow& block_row, int64_t 
tablet_id) {
     // But there is still some unfinished things, we do mem limit here 
temporarily.
     // _cancelled may be set by rpc callback, and it's possible that 
_cancelled might be set in any of the steps below.
     // It's fine to do a fake add_row() and return OK, because we will check 
_cancelled in next add_row() or mark_close().
-    while (!_cancelled && 
_parent->_mem_tracker->AnyLimitExceeded(MemLimit::HARD) &&
+    while (!_cancelled && (_pending_batches_bytes > _max_pending_batches_bytes 
|| _parent->_mem_tracker->AnyLimitExceeded(MemLimit::HARD)) &&
            _pending_batches_num > 0) {
         SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns);
         SleepFor(MonoDelta::FromMilliseconds(10));
@@ -306,6 +308,7 @@ Status NodeChannel::add_row(BlockRow& block_row, int64_t 
tablet_id) {
         {
             SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns);
             std::lock_guard<std::mutex> l(_pending_batches_lock);
+            _pending_batches_bytes += 
_cur_batch->tuple_data_pool()->total_reserved_bytes();
             //To simplify the add_row logic, postpone adding batch into req 
until the time of sending req
             _pending_batches.emplace(std::move(_cur_batch), 
_cur_add_batch_request);
             _pending_batches_num++;
@@ -341,6 +344,7 @@ Status NodeChannel::mark_close() {
     {
         debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan;
         std::lock_guard<std::mutex> l(_pending_batches_lock);
+        _pending_batches_bytes += 
_cur_batch->tuple_data_pool()->total_reserved_bytes();
         _pending_batches.emplace(std::move(_cur_batch), 
_cur_add_batch_request);
         _pending_batches_num++;
         DCHECK(_pending_batches.back().second.eos());
@@ -447,6 +451,7 @@ void NodeChannel::try_send_batch() {
         send_batch = std::move(_pending_batches.front());
         _pending_batches.pop();
         _pending_batches_num--;
+        _pending_batches_bytes -= 
send_batch.first->tuple_data_pool()->total_reserved_bytes();
     }
 
     auto row_batch = std::move(send_batch.first);
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 8bd1e96..c485632 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -256,6 +256,9 @@ private:
     using AddBatchReq = std::pair<std::unique_ptr<RowBatch>, 
PTabletWriterAddBatchRequest>;
     std::queue<AddBatchReq> _pending_batches;
     std::atomic<int> _pending_batches_num {0};
+    // limit _pending_batches size
+    std::atomic<size_t> _pending_batches_bytes {0};
+    size_t _max_pending_batches_bytes {10 * 1024 * 1024};
 
     std::shared_ptr<PBackendService_Stub> _stub = nullptr;
     RefCountClosure<PTabletWriterOpenResult>* _open_closure = nullptr;
diff --git a/be/src/vec/exec/volap_scan_node.cpp 
b/be/src/vec/exec/volap_scan_node.cpp
index 481a630..78a2b46 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -110,6 +110,9 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) {
             // 3 transfer result block when queue is not empty
             if (LIKELY(!_scan_blocks.empty())) {
                 blocks.swap(_scan_blocks);
+                for (auto b : blocks) {
+                    _scan_row_batches_bytes -= b->allocated_bytes();
+                }
                 // delete scan_block if transfer thread should be stopped
                 // because scan_block wouldn't be useful anymore
                 if (UNLIKELY(_transfer_done)) {
@@ -191,12 +194,15 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) 
{
     // need yield this thread when we do enough work. However, OlapStorage read
     // data in pre-aggregate mode, then we can't use storage returned data to
     // judge if we need to yield. So we record all raw data read in this round
-    // scan, if this exceed threshold, we yield this thread.
+    // scan, if this exceed row number or bytes threshold, we yield this 
thread.
     int64_t raw_rows_read = scanner->raw_rows_read();
     int64_t raw_rows_threshold = raw_rows_read + config::doris_scanner_row_num;
+    int64_t raw_bytes_read = 0;
+    int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
     bool get_free_block = true;
 
-    while (!eos && raw_rows_read < raw_rows_threshold && get_free_block) {
+    while (!eos && raw_rows_read < raw_rows_threshold &&
+           raw_bytes_read < raw_bytes_threshold && get_free_block) {
         if (UNLIKELY(_transfer_done)) {
             eos = true;
             status = Status::Cancelled("Cancelled");
@@ -214,6 +220,9 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
             eos = true;
             break;
         }
+
+        raw_bytes_read += block->allocated_bytes();
+
         // 4. if status not ok, change status_.
         if (UNLIKELY(block->rows() == 0)) {
             std::lock_guard<std::mutex> l(_free_blocks_lock);
@@ -245,6 +254,9 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
         } else {
             std::lock_guard<std::mutex> l(_scan_blocks_lock);
             _scan_blocks.insert(_scan_blocks.end(), blocks.begin(), 
blocks.end());
+            for (auto b : blocks) {
+                _scan_row_batches_bytes += b->allocated_bytes();
+            }
         }
         // If eos is true, we will process out of this lock block.
         if (eos) { scanner->mark_to_need_to_close(); }
@@ -275,13 +287,18 @@ Status VOlapScanNode::_add_blocks(std::vector<Block*>& 
block) {
     {
         std::unique_lock<std::mutex> l(_blocks_lock);
 
-        while (UNLIKELY(_materialized_blocks.size() >= 
_max_materialized_blocks &&
+        // check queue limit for both block queue size and bytes
+        while (UNLIKELY((_materialized_blocks.size() >= 
_max_materialized_blocks ||
+                         _materialized_row_batches_bytes >= 
_max_scanner_queue_size_bytes / 2) &&
                         !_transfer_done)) {
             _block_consumed_cv.wait(l);
         }
 
         VLOG_CRITICAL << "Push block to materialized_blocks";
         _materialized_blocks.insert(_materialized_blocks.end(), 
block.cbegin(), block.cend());
+        for (auto b : block) {
+            _materialized_row_batches_bytes += b->allocated_bytes();
+        }
     }
     // remove one block, notify main thread
     _block_added_cv.notify_one();
@@ -383,8 +400,11 @@ Status VOlapScanNode::close(RuntimeState* state) {
     // clear some block in queue
     // TODO: The presence of transfer_thread here may cause Block's memory 
alloc and be released not in a thread,
     // which may lead to potential performance problems. we should rethink 
whether to delete the transfer thread
-    std::for_each(_materialized_blocks.begin(), _materialized_blocks.end(), 
std::default_delete<Block>());
+    std::for_each(_materialized_blocks.begin(), _materialized_blocks.end(),
+                  std::default_delete<Block>());
+    _materialized_row_batches_bytes = 0;
     std::for_each(_scan_blocks.begin(), _scan_blocks.end(), 
std::default_delete<Block>());
+    _scan_row_batches_bytes = 0;
     std::for_each(_free_blocks.begin(), _free_blocks.end(), 
std::default_delete<Block>());
     _mem_tracker->Release(_buffered_bytes);
 
@@ -458,6 +478,7 @@ Status VOlapScanNode::get_next(RuntimeState* state, Block* 
block, bool* eos) {
             materialized_block = _materialized_blocks.back();
             DCHECK(materialized_block != NULL);
             _materialized_blocks.pop_back();
+            _materialized_row_batches_bytes -= 
materialized_block->allocated_bytes();
         }
     }
 
@@ -516,15 +537,29 @@ Block* VOlapScanNode::_alloc_block(bool& get_free_block) {
 int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int 
block_per_scanner) {
     std::list<VOlapScanner*> olap_scanners;
     int assigned_thread_num = _running_thread;
+    size_t max_thread = std::min(_volap_scanners.size(),
+                     
static_cast<size_t>(config::doris_scanner_thread_pool_thread_num));
     // copy to local
     {
         // How many thread can apply to this query
         size_t thread_slot_num = 0;
         {
-            std::lock_guard<std::mutex> l(_free_blocks_lock);
-            thread_slot_num = _free_blocks.size() / block_per_scanner;
-            thread_slot_num += (_free_blocks.size() % block_per_scanner != 0);
-            if (thread_slot_num == 0) thread_slot_num++;
+            if (_scan_row_batches_bytes < _max_scanner_queue_size_bytes / 2) {
+                std::lock_guard<std::mutex> l(_free_blocks_lock);
+                thread_slot_num = _free_blocks.size() / block_per_scanner;
+                thread_slot_num += (_free_blocks.size() % block_per_scanner != 
0);
+                thread_slot_num = std::min(thread_slot_num, max_thread - 
assigned_thread_num);
+                if (thread_slot_num <= 0) thread_slot_num = 1;
+            } else {
+                std::lock_guard<std::mutex> l(_scan_blocks_lock);
+                if (_scan_blocks.empty()) {
+                    // Just for notify if _scan_blocks is empty and no running 
thread
+                    if (assigned_thread_num == 0) {
+                        thread_slot_num = 1;
+                        // NOTE: if olap_scanners_ is empty, scanner_done_ 
should be true
+                    }
+                }
+            }
         }
 
         {
diff --git a/be/src/vec/exec/volap_scanner.cpp 
b/be/src/vec/exec/volap_scanner.cpp
index d99184a..964c000 100644
--- a/be/src/vec/exec/volap_scanner.cpp
+++ b/be/src/vec/exec/volap_scanner.cpp
@@ -43,6 +43,7 @@ Status VOlapScanner::get_block(RuntimeState* state, 
vectorized::Block* block, bo
     DCHECK(block->rows() == 0);
 
     int64_t raw_rows_threshold = raw_rows_read() + 
config::doris_scanner_row_num;
+    int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
     if (!block->mem_reuse()) {
         for (const auto slot_desc : _tuple_desc->slots()) {
             
block->insert(ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
@@ -66,7 +67,12 @@ Status VOlapScanner::get_block(RuntimeState* state, 
vectorized::Block* block, bo
 
         RETURN_IF_ERROR(
                 VExprContext::filter_block(_vconjunct_ctx, block, 
_tuple_desc->slots().size()));
-    } while (block->rows() == 0 && !(*eof) && raw_rows_read() < 
raw_rows_threshold);
+    } while (block->rows() == 0 && !(*eof) && raw_rows_read() < 
raw_rows_threshold &&
+             block->allocated_bytes() < raw_bytes_threshold);
+    // NOTE:
+    // There is no need to check raw_bytes_threshold since block->rows() == 0 
is checked first.
+    // But checking raw_bytes_threshold is still added here for consistency 
with raw_rows_threshold
+    // and olap_scanner.cpp.
 
     return Status::OK();
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to