This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 17f042b85af413629dd2dceab233c46e48f5debf Author: Ashin Gau <[email protected]> AuthorDate: Mon Feb 26 16:32:51 2024 +0800 [opt](scanner) scan enough blocks in each scan task (#31277) --- be/src/vec/exec/scan/scanner_context.cpp | 151 ++++++++++++----------------- be/src/vec/exec/scan/scanner_context.h | 27 ++---- be/src/vec/exec/scan/scanner_scheduler.cpp | 36 ++++--- 3 files changed, 90 insertions(+), 124 deletions(-) diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 45e934ee790..1edf77798d0 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -146,8 +146,7 @@ Status ScannerContext::init() { for (int i = 0; i < _max_thread_num; ++i) { std::weak_ptr<ScannerDelegate> next_scanner; if (_scanners.try_dequeue(next_scanner)) { - vectorized::BlockUPtr block = get_free_block(); - submit_scan_task(std::make_shared<ScanTask>(next_scanner, std::move(block))); + submit_scan_task(std::make_shared<ScanTask>(next_scanner)); _num_running_scanners++; } } @@ -159,27 +158,27 @@ std::string ScannerContext::parent_name() { return _parent ? _parent->get_name() : _local_state->get_name(); } -vectorized::BlockUPtr ScannerContext::get_free_block() { +vectorized::BlockUPtr ScannerContext::get_free_block(bool force) { vectorized::BlockUPtr block; if (_free_blocks.try_dequeue(block)) { - std::lock_guard<std::mutex> fl(_free_blocks_lock); DCHECK(block->mem_reuse()); _free_blocks_memory_usage -= block->allocated_bytes(); _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage); return block; } - - _newly_create_free_blocks_num->update(1); - return vectorized::Block::create_unique(_output_tuple_desc->slots(), _batch_size, - true /*ignore invalid slots*/); + if (_free_blocks_memory_usage < _max_bytes_in_queue || force) { + return vectorized::Block::create_unique(_output_tuple_desc->slots(), _batch_size, + true /*ignore invalid slots*/); + _newly_create_free_blocks_num->update(1); + } + return nullptr; } void ScannerContext::return_free_block(vectorized::BlockUPtr block) { - std::lock_guard<std::mutex> fl(_free_blocks_lock); if (block->mem_reuse() && _free_blocks_memory_usage < _max_bytes_in_queue) { - block->clear_column_data(); _free_blocks_memory_usage += block->allocated_bytes(); _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage); + block->clear_column_data(); _free_blocks.enqueue(std::move(block)); } } @@ -196,10 +195,15 @@ void ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task) { } void ScannerContext::append_block_to_queue(std::shared_ptr<ScanTask> scan_task) { - if (scan_task->status_ok() && scan_task->current_block->rows() > 0) { - Status st = validate_block_schema(scan_task->current_block.get()); - if (!st.ok()) { - scan_task->set_status(st); + if (scan_task->status_ok()) { + for (const vectorized::BlockUPtr& block : scan_task->cached_blocks) { + if (block->rows() > 0) { + Status st = validate_block_schema(block.get()); + if (!st.ok()) { + scan_task->set_status(st); + break; + } + } } } std::lock_guard<std::mutex> l(_transfer_lock); @@ -241,7 +245,6 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo if (!_blocks_queue.empty() && !done()) { _last_fetch_time = UnixMillis(); scan_task = _blocks_queue.front(); - _blocks_queue.pop_front(); } if (scan_task) { @@ -249,52 +252,43 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo _set_scanner_done(); return scan_task->get_status(); } - // We can only know the block size after reading at least one block - // Just take the size of first block as `_estimated_block_size` - if (scan_task->first_block) { - std::lock_guard<std::mutex> fl(_free_blocks_lock); - size_t block_size = scan_task->current_block->allocated_bytes(); - _free_blocks_memory_usage += block_size; - _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage); - scan_task->first_block = false; - if (block_size > _estimated_block_size) { - _estimated_block_size = block_size; - } + DCHECK(!scan_task->cached_blocks.empty()); + vectorized::BlockUPtr current_block = std::move(scan_task->cached_blocks.front()); + scan_task->cached_blocks.pop_front(); + size_t block_size = current_block->allocated_bytes(); + if (_estimated_block_size > block_size) { + _estimated_block_size = block_size; } + _free_blocks_memory_usage -= block_size; + _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage); // consume current block - block->swap(*scan_task->current_block); - if (!scan_task->current_block->mem_reuse()) { - // it depends on the memory strategy of ScanNode/ScanOperator - // we should double check `mem_reuse()` of `current_block` to make sure it can be reused - _newly_create_free_blocks_num->update(1); - scan_task->current_block = vectorized::Block::create_unique(_output_tuple_desc->slots(), - _batch_size, true); - } - if (scan_task->is_eos()) { // current scanner is finished, and no more data to read - _num_finished_scanners++; - std::weak_ptr<ScannerDelegate> next_scanner; - // submit one of the remaining scanners - if (_scanners.try_dequeue(next_scanner)) { - // reuse current running scanner, just reset some states. - scan_task->reuse_scanner(next_scanner); - submit_scan_task(scan_task); - } else { - // no more scanner to be scheduled - // `_free_blocks` serve all running scanners, maybe it's too large for the remaining scanners - int free_blocks_for_each = _free_blocks.size_approx() / _num_running_scanners; - _num_running_scanners--; - std::lock_guard<std::mutex> fl(_free_blocks_lock); - for (int i = 0; i < free_blocks_for_each; ++i) { - vectorized::BlockUPtr removed_block; - if (_free_blocks.try_dequeue(removed_block)) { - _free_blocks_memory_usage -= block->allocated_bytes(); - _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage); + block->swap(*current_block); + return_free_block(std::move(current_block)); + if (scan_task->cached_blocks.empty()) { + _blocks_queue.pop_front(); + if (scan_task->is_eos()) { // current scanner is finished, and no more data to read + _num_finished_scanners++; + std::weak_ptr<ScannerDelegate> next_scanner; + // submit one of the remaining scanners + if (_scanners.try_dequeue(next_scanner)) { + submit_scan_task(std::make_shared<ScanTask>(next_scanner)); + } else { + // no more scanner to be scheduled + // `_free_blocks` serve all running scanners, maybe it's too large for the remaining scanners + int free_blocks_for_each = _free_blocks.size_approx() / _num_running_scanners; + _num_running_scanners--; + for (int i = 0; i < free_blocks_for_each; ++i) { + vectorized::BlockUPtr removed_block; + if (_free_blocks.try_dequeue(removed_block)) { + _free_blocks_memory_usage -= block->allocated_bytes(); + _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage); + } } } + } else { + // resubmit current running scanner to read the next block + submit_scan_task(scan_task); } - } else { - // resubmit current running scanner to read the next block - submit_scan_task(scan_task); } // scale up _try_to_scale_up(); @@ -328,44 +322,23 @@ void ScannerContext::_try_to_scale_up() { return; } - std::lock_guard<std::mutex> fl(_free_blocks_lock); bool is_scale_up = false; // calculate the number of scanners that can be scheduled int num_add = std::min(_num_running_scanners * SCALE_UP_RATIO, _max_thread_num * MAX_SCALE_UP_RATIO - _num_running_scanners); - num_add = std::max(num_add, 1); + if (_estimated_block_size > 0) { + int most_add = + (_max_bytes_in_queue - _free_blocks_memory_usage) / _estimated_block_size; + num_add = std::min(num_add, most_add); + } for (int i = 0; i < num_add; ++i) { - vectorized::BlockUPtr allocate_block = nullptr; - // reuse block in `_free_blocks` firstly - if (!_free_blocks.try_dequeue(allocate_block)) { - if (_free_blocks_memory_usage < _max_bytes_in_queue) { - _newly_create_free_blocks_num->update(1); - allocate_block = vectorized::Block::create_unique(_output_tuple_desc->slots(), - _batch_size, true); - } - } else { - // comes from `_free_blocks`, decrease first, then will be added back. - _free_blocks_memory_usage -= allocate_block->allocated_bytes(); - _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage); - } - if (allocate_block) { - // get enough memory to launch one more scanner. - std::weak_ptr<ScannerDelegate> scale_up_scanner; - if (_scanners.try_dequeue(scale_up_scanner)) { - std::shared_ptr<ScanTask> scale_up_task = - std::make_shared<ScanTask>(scale_up_scanner, std::move(allocate_block)); - _free_blocks_memory_usage += _estimated_block_size; - _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage); - // `first_block` is used to update `_free_blocks_memory_usage`, - // we have got the `_estimated_block_size`, no need for further updates - scale_up_task->first_block = false; - submit_scan_task(scale_up_task); - _num_running_scanners++; - _scale_up_scanners_counter->update(1); - is_scale_up = true; - } else { - break; - } + // get enough memory to launch one more scanner. + std::weak_ptr<ScannerDelegate> scale_up_scanner; + if (_scanners.try_dequeue(scale_up_scanner)) { + submit_scan_task(std::make_shared<ScanTask>(scale_up_scanner)); + _num_running_scanners++; + _scale_up_scanners_counter->update(1); + is_scale_up = true; } else { break; } diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index ca80d6e748c..393920a7462 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -59,8 +59,7 @@ class SimplifiedScanScheduler; class ScanTask { public: - ScanTask(std::weak_ptr<ScannerDelegate> delegate_scanner, vectorized::BlockUPtr free_block) - : scanner(delegate_scanner), current_block(std::move(free_block)) {} + ScanTask(std::weak_ptr<ScannerDelegate> delegate_scanner) : scanner(delegate_scanner) {} private: // whether current scanner is finished @@ -69,10 +68,7 @@ private: public: std::weak_ptr<ScannerDelegate> scanner; - // cache the block of current loop - vectorized::BlockUPtr current_block; - // only take the size of the first block as estimated size - bool first_block = true; + std::list<vectorized::BlockUPtr> cached_blocks; uint64_t last_submit_time; // nanoseconds void set_status(Status _status) { @@ -86,16 +82,6 @@ public: bool status_ok() { return status.ok() || status.is<ErrorCode::END_OF_FILE>(); } bool is_eos() const { return eos; } void set_eos(bool _eos) { eos = _eos; } - - // reuse current running scanner - // reset `eos` and `status` - // `first_block` is used to update `_free_blocks_memory_usage`, and take the first block size - // as the `_estimated_block_size`. It has updated `_free_blocks_memory_usage`, so don't reset. - void reuse_scanner(std::weak_ptr<ScannerDelegate> next_scanner) { - scanner = next_scanner; - eos = false; - status = Status::OK(); - } }; // ScannerContext is responsible for recording the execution status @@ -120,8 +106,12 @@ public: virtual ~ScannerContext() = default; virtual Status init(); - vectorized::BlockUPtr get_free_block(); + vectorized::BlockUPtr get_free_block(bool force); void return_free_block(vectorized::BlockUPtr block); + inline void inc_free_block_usage(size_t usage) { + _free_blocks_memory_usage += usage; + _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage); + } // Get next block from blocks queue. Called by ScanNode/ScanOperator // Set eos to true if there is no more data to read. @@ -231,9 +221,8 @@ protected: RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr; // for scaling up the running scanners - std::mutex _free_blocks_lock; size_t _estimated_block_size = 0; - int64_t _free_blocks_memory_usage = 0; + std::atomic_long _free_blocks_memory_usage = 0; int64_t _last_scale_up_time = 0; int64_t _last_fetch_time = 0; int64_t _total_wait_block_time = 0; diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index d3fd740b437..75bd04b0e7a 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -244,21 +244,21 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx, static_cast<void>(scanner->try_append_late_arrival_runtime_filter()); + size_t raw_bytes_threshold = config::doris_scanner_row_bytes; + size_t raw_bytes_read = 0; bool first_read = true; - while (!eos) { + while (!eos && raw_bytes_read < raw_bytes_threshold) { if (UNLIKELY(ctx->done())) { eos = true; break; } - BlockUPtr free_block = nullptr; - if (first_read) { - status = scanner->get_block_after_projects(state, scan_task->current_block.get(), &eos); - first_read = false; - } else { - free_block = ctx->get_free_block(); - status = scanner->get_block_after_projects(state, free_block.get(), &eos); + BlockUPtr free_block = ctx->get_free_block(first_read); + if (free_block == nullptr) { + break; } - + status = scanner->get_block_after_projects(state, free_block.get(), &eos); + raw_bytes_read += free_block->allocated_bytes(); + first_read = false; // The VFileScanner for external table may try to open not exist files, // Because FE file cache for external table may out of date. // So, NOT_FOUND for VFileScanner is not a fail case. @@ -275,15 +275,19 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx, eos = true; break; } - - if (!first_read && free_block) { - vectorized::MutableBlock mutable_block(scan_task->current_block.get()); + if (!scan_task->cached_blocks.empty() && + scan_task->cached_blocks.back()->rows() + free_block->rows() <= ctx->batch_size()) { + size_t block_size = scan_task->cached_blocks.back()->allocated_bytes(); + vectorized::MutableBlock mutable_block(scan_task->cached_blocks.back().get()); static_cast<void>(mutable_block.merge(*free_block)); - scan_task->current_block->set_columns(std::move(mutable_block.mutable_columns())); + scan_task->cached_blocks.back().get()->set_columns( + std::move(mutable_block.mutable_columns())); ctx->return_free_block(std::move(free_block)); - } - if (scan_task->current_block->rows() >= ctx->batch_size()) { - break; + ctx->inc_free_block_usage(scan_task->cached_blocks.back()->allocated_bytes() - + block_size); + } else { + ctx->inc_free_block_usage(free_block->allocated_bytes()); + scan_task->cached_blocks.push_back(std::move(free_block)); } } // end for while --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
