This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 17f042b85af413629dd2dceab233c46e48f5debf
Author: Ashin Gau <[email protected]>
AuthorDate: Mon Feb 26 16:32:51 2024 +0800

    [opt](scanner) scan enough blocks in each scan task (#31277)
---
 be/src/vec/exec/scan/scanner_context.cpp   | 151 ++++++++++++-----------------
 be/src/vec/exec/scan/scanner_context.h     |  27 ++----
 be/src/vec/exec/scan/scanner_scheduler.cpp |  36 ++++---
 3 files changed, 90 insertions(+), 124 deletions(-)

diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 45e934ee790..1edf77798d0 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -146,8 +146,7 @@ Status ScannerContext::init() {
     for (int i = 0; i < _max_thread_num; ++i) {
         std::weak_ptr<ScannerDelegate> next_scanner;
         if (_scanners.try_dequeue(next_scanner)) {
-            vectorized::BlockUPtr block = get_free_block();
-            submit_scan_task(std::make_shared<ScanTask>(next_scanner, 
std::move(block)));
+            submit_scan_task(std::make_shared<ScanTask>(next_scanner));
             _num_running_scanners++;
         }
     }
@@ -159,27 +158,27 @@ std::string ScannerContext::parent_name() {
     return _parent ? _parent->get_name() : _local_state->get_name();
 }
 
-vectorized::BlockUPtr ScannerContext::get_free_block() {
+vectorized::BlockUPtr ScannerContext::get_free_block(bool force) {
     vectorized::BlockUPtr block;
     if (_free_blocks.try_dequeue(block)) {
-        std::lock_guard<std::mutex> fl(_free_blocks_lock);
         DCHECK(block->mem_reuse());
         _free_blocks_memory_usage -= block->allocated_bytes();
         _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
         return block;
     }
-
-    _newly_create_free_blocks_num->update(1);
-    return vectorized::Block::create_unique(_output_tuple_desc->slots(), 
_batch_size,
-                                            true /*ignore invalid slots*/);
+    if (_free_blocks_memory_usage < _max_bytes_in_queue || force) {
+        return vectorized::Block::create_unique(_output_tuple_desc->slots(), 
_batch_size,
+                                                true /*ignore invalid slots*/);
+        _newly_create_free_blocks_num->update(1);
+    }
+    return nullptr;
 }
 
 void ScannerContext::return_free_block(vectorized::BlockUPtr block) {
-    std::lock_guard<std::mutex> fl(_free_blocks_lock);
     if (block->mem_reuse() && _free_blocks_memory_usage < _max_bytes_in_queue) 
{
-        block->clear_column_data();
         _free_blocks_memory_usage += block->allocated_bytes();
         _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
+        block->clear_column_data();
         _free_blocks.enqueue(std::move(block));
     }
 }
@@ -196,10 +195,15 @@ void 
ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task) {
 }
 
 void ScannerContext::append_block_to_queue(std::shared_ptr<ScanTask> 
scan_task) {
-    if (scan_task->status_ok() && scan_task->current_block->rows() > 0) {
-        Status st = validate_block_schema(scan_task->current_block.get());
-        if (!st.ok()) {
-            scan_task->set_status(st);
+    if (scan_task->status_ok()) {
+        for (const vectorized::BlockUPtr& block : scan_task->cached_blocks) {
+            if (block->rows() > 0) {
+                Status st = validate_block_schema(block.get());
+                if (!st.ok()) {
+                    scan_task->set_status(st);
+                    break;
+                }
+            }
         }
     }
     std::lock_guard<std::mutex> l(_transfer_lock);
@@ -241,7 +245,6 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
     if (!_blocks_queue.empty() && !done()) {
         _last_fetch_time = UnixMillis();
         scan_task = _blocks_queue.front();
-        _blocks_queue.pop_front();
     }
 
     if (scan_task) {
@@ -249,52 +252,43 @@ Status ScannerContext::get_block_from_queue(RuntimeState* 
state, vectorized::Blo
             _set_scanner_done();
             return scan_task->get_status();
         }
-        // We can only know the block size after reading at least one block
-        // Just take the size of first block as `_estimated_block_size`
-        if (scan_task->first_block) {
-            std::lock_guard<std::mutex> fl(_free_blocks_lock);
-            size_t block_size = scan_task->current_block->allocated_bytes();
-            _free_blocks_memory_usage += block_size;
-            _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
-            scan_task->first_block = false;
-            if (block_size > _estimated_block_size) {
-                _estimated_block_size = block_size;
-            }
+        DCHECK(!scan_task->cached_blocks.empty());
+        vectorized::BlockUPtr current_block = 
std::move(scan_task->cached_blocks.front());
+        scan_task->cached_blocks.pop_front();
+        size_t block_size = current_block->allocated_bytes();
+        if (_estimated_block_size > block_size) {
+            _estimated_block_size = block_size;
         }
+        _free_blocks_memory_usage -= block_size;
+        _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
         // consume current block
-        block->swap(*scan_task->current_block);
-        if (!scan_task->current_block->mem_reuse()) {
-            // it depends on the memory strategy of ScanNode/ScanOperator
-            // we should double check `mem_reuse()` of `current_block` to make 
sure it can be reused
-            _newly_create_free_blocks_num->update(1);
-            scan_task->current_block = 
vectorized::Block::create_unique(_output_tuple_desc->slots(),
-                                                                        
_batch_size, true);
-        }
-        if (scan_task->is_eos()) { // current scanner is finished, and no more 
data to read
-            _num_finished_scanners++;
-            std::weak_ptr<ScannerDelegate> next_scanner;
-            // submit one of the remaining scanners
-            if (_scanners.try_dequeue(next_scanner)) {
-                // reuse current running scanner, just reset some states.
-                scan_task->reuse_scanner(next_scanner);
-                submit_scan_task(scan_task);
-            } else {
-                // no more scanner to be scheduled
-                // `_free_blocks` serve all running scanners, maybe it's too 
large for the remaining scanners
-                int free_blocks_for_each = _free_blocks.size_approx() / 
_num_running_scanners;
-                _num_running_scanners--;
-                std::lock_guard<std::mutex> fl(_free_blocks_lock);
-                for (int i = 0; i < free_blocks_for_each; ++i) {
-                    vectorized::BlockUPtr removed_block;
-                    if (_free_blocks.try_dequeue(removed_block)) {
-                        _free_blocks_memory_usage -= block->allocated_bytes();
-                        
_free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
+        block->swap(*current_block);
+        return_free_block(std::move(current_block));
+        if (scan_task->cached_blocks.empty()) {
+            _blocks_queue.pop_front();
+            if (scan_task->is_eos()) { // current scanner is finished, and no 
more data to read
+                _num_finished_scanners++;
+                std::weak_ptr<ScannerDelegate> next_scanner;
+                // submit one of the remaining scanners
+                if (_scanners.try_dequeue(next_scanner)) {
+                    submit_scan_task(std::make_shared<ScanTask>(next_scanner));
+                } else {
+                    // no more scanner to be scheduled
+                    // `_free_blocks` serve all running scanners, maybe it's 
too large for the remaining scanners
+                    int free_blocks_for_each = _free_blocks.size_approx() / 
_num_running_scanners;
+                    _num_running_scanners--;
+                    for (int i = 0; i < free_blocks_for_each; ++i) {
+                        vectorized::BlockUPtr removed_block;
+                        if (_free_blocks.try_dequeue(removed_block)) {
+                            _free_blocks_memory_usage -= 
block->allocated_bytes();
+                            
_free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
+                        }
                     }
                 }
+            } else {
+                // resubmit current running scanner to read the next block
+                submit_scan_task(scan_task);
             }
-        } else {
-            // resubmit current running scanner to read the next block
-            submit_scan_task(scan_task);
         }
         // scale up
         _try_to_scale_up();
@@ -328,44 +322,23 @@ void ScannerContext::_try_to_scale_up() {
             return;
         }
 
-        std::lock_guard<std::mutex> fl(_free_blocks_lock);
         bool is_scale_up = false;
         // calculate the number of scanners that can be scheduled
         int num_add = std::min(_num_running_scanners * SCALE_UP_RATIO,
                                _max_thread_num * MAX_SCALE_UP_RATIO - 
_num_running_scanners);
-        num_add = std::max(num_add, 1);
+        if (_estimated_block_size > 0) {
+            int most_add =
+                    (_max_bytes_in_queue - _free_blocks_memory_usage) / 
_estimated_block_size;
+            num_add = std::min(num_add, most_add);
+        }
         for (int i = 0; i < num_add; ++i) {
-            vectorized::BlockUPtr allocate_block = nullptr;
-            // reuse block in `_free_blocks` firstly
-            if (!_free_blocks.try_dequeue(allocate_block)) {
-                if (_free_blocks_memory_usage < _max_bytes_in_queue) {
-                    _newly_create_free_blocks_num->update(1);
-                    allocate_block = 
vectorized::Block::create_unique(_output_tuple_desc->slots(),
-                                                                      
_batch_size, true);
-                }
-            } else {
-                // comes from `_free_blocks`, decrease first, then will be 
added back.
-                _free_blocks_memory_usage -= allocate_block->allocated_bytes();
-                _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
-            }
-            if (allocate_block) {
-                // get enough memory to launch one more scanner.
-                std::weak_ptr<ScannerDelegate> scale_up_scanner;
-                if (_scanners.try_dequeue(scale_up_scanner)) {
-                    std::shared_ptr<ScanTask> scale_up_task =
-                            std::make_shared<ScanTask>(scale_up_scanner, 
std::move(allocate_block));
-                    _free_blocks_memory_usage += _estimated_block_size;
-                    
_free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
-                    // `first_block` is used to update 
`_free_blocks_memory_usage`,
-                    // we have got the `_estimated_block_size`, no need for 
further updates
-                    scale_up_task->first_block = false;
-                    submit_scan_task(scale_up_task);
-                    _num_running_scanners++;
-                    _scale_up_scanners_counter->update(1);
-                    is_scale_up = true;
-                } else {
-                    break;
-                }
+            // get enough memory to launch one more scanner.
+            std::weak_ptr<ScannerDelegate> scale_up_scanner;
+            if (_scanners.try_dequeue(scale_up_scanner)) {
+                submit_scan_task(std::make_shared<ScanTask>(scale_up_scanner));
+                _num_running_scanners++;
+                _scale_up_scanners_counter->update(1);
+                is_scale_up = true;
             } else {
                 break;
             }
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index ca80d6e748c..393920a7462 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -59,8 +59,7 @@ class SimplifiedScanScheduler;
 
 class ScanTask {
 public:
-    ScanTask(std::weak_ptr<ScannerDelegate> delegate_scanner, 
vectorized::BlockUPtr free_block)
-            : scanner(delegate_scanner), current_block(std::move(free_block)) 
{}
+    ScanTask(std::weak_ptr<ScannerDelegate> delegate_scanner) : 
scanner(delegate_scanner) {}
 
 private:
     // whether current scanner is finished
@@ -69,10 +68,7 @@ private:
 
 public:
     std::weak_ptr<ScannerDelegate> scanner;
-    // cache the block of current loop
-    vectorized::BlockUPtr current_block;
-    // only take the size of the first block as estimated size
-    bool first_block = true;
+    std::list<vectorized::BlockUPtr> cached_blocks;
     uint64_t last_submit_time; // nanoseconds
 
     void set_status(Status _status) {
@@ -86,16 +82,6 @@ public:
     bool status_ok() { return status.ok() || 
status.is<ErrorCode::END_OF_FILE>(); }
     bool is_eos() const { return eos; }
     void set_eos(bool _eos) { eos = _eos; }
-
-    // reuse current running scanner
-    // reset `eos` and `status`
-    // `first_block` is used to update `_free_blocks_memory_usage`, and take 
the first block size
-    // as the `_estimated_block_size`. It has updated 
`_free_blocks_memory_usage`, so don't reset.
-    void reuse_scanner(std::weak_ptr<ScannerDelegate> next_scanner) {
-        scanner = next_scanner;
-        eos = false;
-        status = Status::OK();
-    }
 };
 
 // ScannerContext is responsible for recording the execution status
@@ -120,8 +106,12 @@ public:
     virtual ~ScannerContext() = default;
     virtual Status init();
 
-    vectorized::BlockUPtr get_free_block();
+    vectorized::BlockUPtr get_free_block(bool force);
     void return_free_block(vectorized::BlockUPtr block);
+    inline void inc_free_block_usage(size_t usage) {
+        _free_blocks_memory_usage += usage;
+        _free_blocks_memory_usage_mark->set(_free_blocks_memory_usage);
+    }
 
     // Get next block from blocks queue. Called by ScanNode/ScanOperator
     // Set eos to true if there is no more data to read.
@@ -231,9 +221,8 @@ protected:
     RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr;
 
     // for scaling up the running scanners
-    std::mutex _free_blocks_lock;
     size_t _estimated_block_size = 0;
-    int64_t _free_blocks_memory_usage = 0;
+    std::atomic_long _free_blocks_memory_usage = 0;
     int64_t _last_scale_up_time = 0;
     int64_t _last_fetch_time = 0;
     int64_t _total_wait_block_time = 0;
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index d3fd740b437..75bd04b0e7a 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -244,21 +244,21 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
 
     static_cast<void>(scanner->try_append_late_arrival_runtime_filter());
 
+    size_t raw_bytes_threshold = config::doris_scanner_row_bytes;
+    size_t raw_bytes_read = 0;
     bool first_read = true;
-    while (!eos) {
+    while (!eos && raw_bytes_read < raw_bytes_threshold) {
         if (UNLIKELY(ctx->done())) {
             eos = true;
             break;
         }
-        BlockUPtr free_block = nullptr;
-        if (first_read) {
-            status = scanner->get_block_after_projects(state, 
scan_task->current_block.get(), &eos);
-            first_read = false;
-        } else {
-            free_block = ctx->get_free_block();
-            status = scanner->get_block_after_projects(state, 
free_block.get(), &eos);
+        BlockUPtr free_block = ctx->get_free_block(first_read);
+        if (free_block == nullptr) {
+            break;
         }
-
+        status = scanner->get_block_after_projects(state, free_block.get(), 
&eos);
+        raw_bytes_read += free_block->allocated_bytes();
+        first_read = false;
         // The VFileScanner for external table may try to open not exist files,
         // Because FE file cache for external table may out of date.
         // So, NOT_FOUND for VFileScanner is not a fail case.
@@ -275,15 +275,19 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
             eos = true;
             break;
         }
-
-        if (!first_read && free_block) {
-            vectorized::MutableBlock 
mutable_block(scan_task->current_block.get());
+        if (!scan_task->cached_blocks.empty() &&
+            scan_task->cached_blocks.back()->rows() + free_block->rows() <= 
ctx->batch_size()) {
+            size_t block_size = 
scan_task->cached_blocks.back()->allocated_bytes();
+            vectorized::MutableBlock 
mutable_block(scan_task->cached_blocks.back().get());
             static_cast<void>(mutable_block.merge(*free_block));
-            
scan_task->current_block->set_columns(std::move(mutable_block.mutable_columns()));
+            scan_task->cached_blocks.back().get()->set_columns(
+                    std::move(mutable_block.mutable_columns()));
             ctx->return_free_block(std::move(free_block));
-        }
-        if (scan_task->current_block->rows() >= ctx->batch_size()) {
-            break;
+            
ctx->inc_free_block_usage(scan_task->cached_blocks.back()->allocated_bytes() -
+                                      block_size);
+        } else {
+            ctx->inc_free_block_usage(free_block->allocated_bytes());
+            scan_task->cached_blocks.push_back(std::move(free_block));
         }
     } // end for while
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to