This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.0 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit ca058465f861f1df95e25d1a3b009e71c5bdf2ea Author: Kang <[email protected]> AuthorDate: Sun Mar 13 22:12:15 2022 +0800 [improvement](memory) fix olap table scan and sink memory usage problem (#8451) Due to unlimited queue in OlapScanNode and NodeChannel, memory usage can be very large for reading and writing large table, e.g 'insert into tableB select * from tableA'. --- be/src/common/config.h | 4 ++- be/src/exec/olap_scan_node.cpp | 22 +++++++++++++--- be/src/exec/olap_scan_node.h | 6 +++++ be/src/exec/olap_scanner.cpp | 10 ++++---- be/src/exec/tablet_sink.cpp | 9 +++++-- be/src/exec/tablet_sink.h | 3 +++ be/src/vec/exec/volap_scan_node.cpp | 51 +++++++++++++++++++++++++++++++------ be/src/vec/exec/volap_scanner.cpp | 8 +++++- 8 files changed, 92 insertions(+), 21 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 8f3b0b7..26bd081 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -167,8 +167,10 @@ CONF_mInt64(thrift_client_retry_interval_ms, "1000"); CONF_mInt32(doris_scan_range_row_count, "524288"); // size of scanner queue between scanner thread and compute thread CONF_mInt32(doris_scanner_queue_size, "1024"); -// single read execute fragment row size +// single read execute fragment row number CONF_mInt32(doris_scanner_row_num, "16384"); +// single read execute fragment row bytes +CONF_mInt32(doris_scanner_row_bytes, "10485760"); // number of max scan keys CONF_mInt32(doris_max_scan_key_num, "1024"); // the max number of push down values of a single column. diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index af26ae1..19ec140 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -77,6 +77,8 @@ Status OlapScanNode::init(const TPlanNode& tnode, RuntimeState* state) { _max_pushdown_conditions_per_column = config::max_pushdown_conditions_per_column; } + _max_scanner_queue_size_bytes = query_options.mem_limit / 20; //TODO: session variable percent + /// TODO: could one filter used in the different scan_node ? int filter_size = _runtime_filter_descs.size(); _runtime_filter_ctxs.resize(filter_size); @@ -306,6 +308,7 @@ Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo materialized_batch = _materialized_row_batches.front(); DCHECK(materialized_batch != nullptr); _materialized_row_batches.pop_front(); + _materialized_row_batches_bytes -= materialized_batch->tuple_data_pool()->total_reserved_bytes(); } } @@ -394,12 +397,14 @@ Status OlapScanNode::close(RuntimeState* state) { } _materialized_row_batches.clear(); + _materialized_row_batches_bytes = 0; for (auto row_batch : _scan_row_batches) { delete row_batch; } _scan_row_batches.clear(); + _scan_row_batches_bytes = 0; // OlapScanNode terminate by exception // so that initiative close the Scanner @@ -1371,6 +1376,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { int max_thread = _max_materialized_row_batches; if (config::doris_scanner_row_num > state->batch_size()) { max_thread /= config::doris_scanner_row_num / state->batch_size(); + if (max_thread <= 0) max_thread = 1; } // read from scanner while (LIKELY(status.ok())) { @@ -1393,7 +1399,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { if (state->fragment_mem_tracker() != nullptr) { mem_consume = state->fragment_mem_tracker()->consumption(); } - if (mem_consume < (mem_limit * 6) / 10) { + if (mem_consume < (mem_limit * 6) / 10 && _scan_row_batches_bytes < _max_scanner_queue_size_bytes / 2) { thread_slot_num = max_thread - assigned_thread_num; } else { // Memory already exceed @@ -1473,6 +1479,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) { if (LIKELY(!_scan_row_batches.empty())) { scan_batch = _scan_row_batches.front(); _scan_row_batches.pop_front(); + _scan_row_batches_bytes -= scan_batch->tuple_data_pool()->total_reserved_bytes(); // delete scan_batch if transfer thread should be stopped // because scan_batch wouldn't be useful anymore @@ -1573,10 +1580,12 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) { // need yield this thread when we do enough work. However, OlapStorage read // data in pre-aggregate mode, then we can't use storage returned data to // judge if we need to yield. So we record all raw data read in this round - // scan, if this exceed threshold, we yield this thread. + // scan, if this exceed row number or bytes threshold, we yield this thread. int64_t raw_rows_read = scanner->raw_rows_read(); int64_t raw_rows_threshold = raw_rows_read + config::doris_scanner_row_num; - while (!eos && raw_rows_read < raw_rows_threshold) { + int64_t raw_bytes_read = 0; + int64_t raw_bytes_threshold = config::doris_scanner_row_bytes; + while (!eos && raw_rows_read < raw_rows_threshold && raw_bytes_read < raw_bytes_threshold) { if (UNLIKELY(_transfer_done)) { eos = true; status = Status::Cancelled("Cancelled"); @@ -1602,6 +1611,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) { row_batchs.push_back(row_batch); __sync_fetch_and_add(&_buffered_bytes, row_batch->tuple_data_pool()->total_reserved_bytes()); + raw_bytes_read += row_batch->tuple_data_pool()->total_reserved_bytes(); } raw_rows_read = scanner->raw_rows_read(); } @@ -1630,6 +1640,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) { } else { for (auto rb : row_batchs) { _scan_row_batches.push_back(rb); + _scan_row_batches_bytes += rb->tuple_data_pool()->total_reserved_bytes(); } } // If eos is true, we will process out of this lock block. @@ -1669,13 +1680,16 @@ Status OlapScanNode::add_one_batch(RowBatch* row_batch) { { std::unique_lock<std::mutex> l(_row_batches_lock); - while (UNLIKELY(_materialized_row_batches.size() >= _max_materialized_row_batches && + // check queue limit for both both batch size and bytes + while (UNLIKELY((_materialized_row_batches.size() >= _max_materialized_row_batches || + _materialized_row_batches_bytes >= _max_scanner_queue_size_bytes / 2) && !_transfer_done)) { _row_batch_consumed_cv.wait(l); } VLOG_CRITICAL << "Push row_batch to materialized_row_batches"; _materialized_row_batches.push_back(row_batch); + _materialized_row_batches_bytes += row_batch->tuple_data_pool()->total_reserved_bytes(); } // remove one batch, notify main thread _row_batch_added_cv.notify_one(); diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h index 5ebd647..16ae85e 100644 --- a/be/src/exec/olap_scan_node.h +++ b/be/src/exec/olap_scan_node.h @@ -216,6 +216,8 @@ protected: std::condition_variable _row_batch_consumed_cv; std::list<RowBatch*> _materialized_row_batches; + // to limit _materialized_row_batches_bytes < _max_scanner_queue_size_bytes / 2 + std::atomic_size_t _materialized_row_batches_bytes = 0; std::mutex _scan_batches_lock; std::condition_variable _scan_batch_added_cv; @@ -223,10 +225,14 @@ protected: std::condition_variable _scan_thread_exit_cv; std::list<RowBatch*> _scan_row_batches; + // to limit _scan_row_batches_bytes < _max_scanner_queue_size_bytes / 2 + std::atomic_size_t _scan_row_batches_bytes = 0; std::list<OlapScanner*> _olap_scanners; int _max_materialized_row_batches; + // to limit _materialized_row_batches_bytes and _scan_row_batches_bytes + size_t _max_scanner_queue_size_bytes; bool _start; // Used in Scan thread to ensure thread-safe std::atomic_bool _scanner_done; diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index d7dc839..2fccdd5 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -265,11 +265,14 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { std::unique_ptr<MemPool> mem_pool(new MemPool(_mem_tracker.get())); int64_t raw_rows_threshold = raw_rows_read() + config::doris_scanner_row_num; + int64_t raw_bytes_threshold = config::doris_scanner_row_bytes; { SCOPED_TIMER(_parent->_scan_timer); while (true) { - // Batch is full, break - if (batch->is_full()) { + // Batch is full or reach raw_rows_threshold or raw_bytes_threshold, break + if (batch->is_full() || + batch->tuple_data_pool()->total_reserved_bytes() >= raw_bytes_threshold || + raw_rows_read() >= raw_rows_threshold) { _update_realtime_counter(); break; } @@ -421,9 +424,6 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { } } while (false); - if (raw_rows_read() >= raw_rows_threshold) { - break; - } } } diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index d77fb4e..5b59508 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -102,6 +102,7 @@ Status NodeChannel::init(RuntimeState* state) { _rpc_timeout_ms = state->query_options().query_timeout * 1000; _timeout_watch.start(); + _max_pending_batches_bytes = _parent->_load_mem_limit / 20; //TODO: session variable percent _load_info = "load_id=" + print_id(_parent->_load_id) + ", txn_id=" + std::to_string(_parent->_txn_id); @@ -246,7 +247,7 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) { // But there is still some unfinished things, we do mem limit here temporarily. // _cancelled may be set by rpc callback, and it's possible that _cancelled might be set in any of the steps below. // It's fine to do a fake add_row() and return OK, because we will check _cancelled in next add_row() or mark_close(). - while (!_cancelled && _parent->_mem_tracker->AnyLimitExceeded(MemLimit::HARD) && + while (!_cancelled && (_pending_batches_bytes > _max_pending_batches_bytes || _parent->_mem_tracker->AnyLimitExceeded(MemLimit::HARD)) && _pending_batches_num > 0) { SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns); SleepFor(MonoDelta::FromMilliseconds(10)); @@ -257,6 +258,7 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) { { SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns); std::lock_guard<std::mutex> l(_pending_batches_lock); + _pending_batches_bytes += _cur_batch->tuple_data_pool()->total_reserved_bytes(); //To simplify the add_row logic, postpone adding batch into req until the time of sending req _pending_batches.emplace(std::move(_cur_batch), _cur_add_batch_request); _pending_batches_num++; @@ -295,7 +297,7 @@ Status NodeChannel::add_row(BlockRow& block_row, int64_t tablet_id) { // But there is still some unfinished things, we do mem limit here temporarily. // _cancelled may be set by rpc callback, and it's possible that _cancelled might be set in any of the steps below. // It's fine to do a fake add_row() and return OK, because we will check _cancelled in next add_row() or mark_close(). - while (!_cancelled && _parent->_mem_tracker->AnyLimitExceeded(MemLimit::HARD) && + while (!_cancelled && (_pending_batches_bytes > _max_pending_batches_bytes || _parent->_mem_tracker->AnyLimitExceeded(MemLimit::HARD)) && _pending_batches_num > 0) { SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns); SleepFor(MonoDelta::FromMilliseconds(10)); @@ -306,6 +308,7 @@ Status NodeChannel::add_row(BlockRow& block_row, int64_t tablet_id) { { SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns); std::lock_guard<std::mutex> l(_pending_batches_lock); + _pending_batches_bytes += _cur_batch->tuple_data_pool()->total_reserved_bytes(); //To simplify the add_row logic, postpone adding batch into req until the time of sending req _pending_batches.emplace(std::move(_cur_batch), _cur_add_batch_request); _pending_batches_num++; @@ -341,6 +344,7 @@ Status NodeChannel::mark_close() { { debug::ScopedTSANIgnoreReadsAndWrites ignore_tsan; std::lock_guard<std::mutex> l(_pending_batches_lock); + _pending_batches_bytes += _cur_batch->tuple_data_pool()->total_reserved_bytes(); _pending_batches.emplace(std::move(_cur_batch), _cur_add_batch_request); _pending_batches_num++; DCHECK(_pending_batches.back().second.eos()); @@ -447,6 +451,7 @@ void NodeChannel::try_send_batch() { send_batch = std::move(_pending_batches.front()); _pending_batches.pop(); _pending_batches_num--; + _pending_batches_bytes -= send_batch.first->tuple_data_pool()->total_reserved_bytes(); } auto row_batch = std::move(send_batch.first); diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 8bd1e96..c485632 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -256,6 +256,9 @@ private: using AddBatchReq = std::pair<std::unique_ptr<RowBatch>, PTabletWriterAddBatchRequest>; std::queue<AddBatchReq> _pending_batches; std::atomic<int> _pending_batches_num {0}; + // limit _pending_batches size + std::atomic<size_t> _pending_batches_bytes {0}; + size_t _max_pending_batches_bytes {10 * 1024 * 1024}; std::shared_ptr<PBackendService_Stub> _stub = nullptr; RefCountClosure<PTabletWriterOpenResult>* _open_closure = nullptr; diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp index 481a630..78a2b46 100644 --- a/be/src/vec/exec/volap_scan_node.cpp +++ b/be/src/vec/exec/volap_scan_node.cpp @@ -110,6 +110,9 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) { // 3 transfer result block when queue is not empty if (LIKELY(!_scan_blocks.empty())) { blocks.swap(_scan_blocks); + for (auto b : blocks) { + _scan_row_batches_bytes -= b->allocated_bytes(); + } // delete scan_block if transfer thread should be stopped // because scan_block wouldn't be useful anymore if (UNLIKELY(_transfer_done)) { @@ -191,12 +194,15 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { // need yield this thread when we do enough work. However, OlapStorage read // data in pre-aggregate mode, then we can't use storage returned data to // judge if we need to yield. So we record all raw data read in this round - // scan, if this exceed threshold, we yield this thread. + // scan, if this exceed row number or bytes threshold, we yield this thread. int64_t raw_rows_read = scanner->raw_rows_read(); int64_t raw_rows_threshold = raw_rows_read + config::doris_scanner_row_num; + int64_t raw_bytes_read = 0; + int64_t raw_bytes_threshold = config::doris_scanner_row_bytes; bool get_free_block = true; - while (!eos && raw_rows_read < raw_rows_threshold && get_free_block) { + while (!eos && raw_rows_read < raw_rows_threshold && + raw_bytes_read < raw_bytes_threshold && get_free_block) { if (UNLIKELY(_transfer_done)) { eos = true; status = Status::Cancelled("Cancelled"); @@ -214,6 +220,9 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { eos = true; break; } + + raw_bytes_read += block->allocated_bytes(); + // 4. if status not ok, change status_. if (UNLIKELY(block->rows() == 0)) { std::lock_guard<std::mutex> l(_free_blocks_lock); @@ -245,6 +254,9 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { } else { std::lock_guard<std::mutex> l(_scan_blocks_lock); _scan_blocks.insert(_scan_blocks.end(), blocks.begin(), blocks.end()); + for (auto b : blocks) { + _scan_row_batches_bytes += b->allocated_bytes(); + } } // If eos is true, we will process out of this lock block. if (eos) { scanner->mark_to_need_to_close(); } @@ -275,13 +287,18 @@ Status VOlapScanNode::_add_blocks(std::vector<Block*>& block) { { std::unique_lock<std::mutex> l(_blocks_lock); - while (UNLIKELY(_materialized_blocks.size() >= _max_materialized_blocks && + // check queue limit for both block queue size and bytes + while (UNLIKELY((_materialized_blocks.size() >= _max_materialized_blocks || + _materialized_row_batches_bytes >= _max_scanner_queue_size_bytes / 2) && !_transfer_done)) { _block_consumed_cv.wait(l); } VLOG_CRITICAL << "Push block to materialized_blocks"; _materialized_blocks.insert(_materialized_blocks.end(), block.cbegin(), block.cend()); + for (auto b : block) { + _materialized_row_batches_bytes += b->allocated_bytes(); + } } // remove one block, notify main thread _block_added_cv.notify_one(); @@ -383,8 +400,11 @@ Status VOlapScanNode::close(RuntimeState* state) { // clear some block in queue // TODO: The presence of transfer_thread here may cause Block's memory alloc and be released not in a thread, // which may lead to potential performance problems. we should rethink whether to delete the transfer thread - std::for_each(_materialized_blocks.begin(), _materialized_blocks.end(), std::default_delete<Block>()); + std::for_each(_materialized_blocks.begin(), _materialized_blocks.end(), + std::default_delete<Block>()); + _materialized_row_batches_bytes = 0; std::for_each(_scan_blocks.begin(), _scan_blocks.end(), std::default_delete<Block>()); + _scan_row_batches_bytes = 0; std::for_each(_free_blocks.begin(), _free_blocks.end(), std::default_delete<Block>()); _mem_tracker->Release(_buffered_bytes); @@ -458,6 +478,7 @@ Status VOlapScanNode::get_next(RuntimeState* state, Block* block, bool* eos) { materialized_block = _materialized_blocks.back(); DCHECK(materialized_block != NULL); _materialized_blocks.pop_back(); + _materialized_row_batches_bytes -= materialized_block->allocated_bytes(); } } @@ -516,15 +537,29 @@ Block* VOlapScanNode::_alloc_block(bool& get_free_block) { int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per_scanner) { std::list<VOlapScanner*> olap_scanners; int assigned_thread_num = _running_thread; + size_t max_thread = std::min(_volap_scanners.size(), + static_cast<size_t>(config::doris_scanner_thread_pool_thread_num)); // copy to local { // How many thread can apply to this query size_t thread_slot_num = 0; { - std::lock_guard<std::mutex> l(_free_blocks_lock); - thread_slot_num = _free_blocks.size() / block_per_scanner; - thread_slot_num += (_free_blocks.size() % block_per_scanner != 0); - if (thread_slot_num == 0) thread_slot_num++; + if (_scan_row_batches_bytes < _max_scanner_queue_size_bytes / 2) { + std::lock_guard<std::mutex> l(_free_blocks_lock); + thread_slot_num = _free_blocks.size() / block_per_scanner; + thread_slot_num += (_free_blocks.size() % block_per_scanner != 0); + thread_slot_num = std::min(thread_slot_num, max_thread - assigned_thread_num); + if (thread_slot_num <= 0) thread_slot_num = 1; + } else { + std::lock_guard<std::mutex> l(_scan_blocks_lock); + if (_scan_blocks.empty()) { + // Just for notify if _scan_blocks is empty and no running thread + if (assigned_thread_num == 0) { + thread_slot_num = 1; + // NOTE: if olap_scanners_ is empty, scanner_done_ should be true + } + } + } } { diff --git a/be/src/vec/exec/volap_scanner.cpp b/be/src/vec/exec/volap_scanner.cpp index d99184a..964c000 100644 --- a/be/src/vec/exec/volap_scanner.cpp +++ b/be/src/vec/exec/volap_scanner.cpp @@ -43,6 +43,7 @@ Status VOlapScanner::get_block(RuntimeState* state, vectorized::Block* block, bo DCHECK(block->rows() == 0); int64_t raw_rows_threshold = raw_rows_read() + config::doris_scanner_row_num; + int64_t raw_bytes_threshold = config::doris_scanner_row_bytes; if (!block->mem_reuse()) { for (const auto slot_desc : _tuple_desc->slots()) { block->insert(ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(), @@ -66,7 +67,12 @@ Status VOlapScanner::get_block(RuntimeState* state, vectorized::Block* block, bo RETURN_IF_ERROR( VExprContext::filter_block(_vconjunct_ctx, block, _tuple_desc->slots().size())); - } while (block->rows() == 0 && !(*eof) && raw_rows_read() < raw_rows_threshold); + } while (block->rows() == 0 && !(*eof) && raw_rows_read() < raw_rows_threshold && + block->allocated_bytes() < raw_bytes_threshold); + // NOTE: + // There is no need to check raw_bytes_threshold since block->rows() == 0 is checked first. + // But checking raw_bytes_threshold is still added here for consistency with raw_rows_threshold + // and olap_scanner.cpp. return Status::OK(); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
